From bbc708d1c531d7ec480273d7d88cb40d3d685cd0 Mon Sep 17 00:00:00 2001 From: RioAngele <1921292838@qq.com> Date: Tue, 23 May 2023 17:06:10 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E9=87=8D=E8=BD=BD=E4=BA=86=20=E4=BD=8D?= =?UTF-8?q?=E4=BA=8ERecover.java,pageCacheImpl.java=20=E4=B8=AD=E7=9A=84?= =?UTF-8?q?=E4=B8=A4=E4=B8=AA=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../guoziyang/mydb/backend/dm/Recover.java | 25 ++++++++++++++- .../backend/dm/pageCache/PageCacheImpl.java | 31 +++++++++++++++++-- 2 files changed, 52 insertions(+), 4 deletions(-) diff --git a/src/main/java/top/guoziyang/mydb/backend/dm/Recover.java b/src/main/java/top/guoziyang/mydb/backend/dm/Recover.java index 42fc076..9145518 100644 --- a/src/main/java/top/guoziyang/mydb/backend/dm/Recover.java +++ b/src/main/java/top/guoziyang/mydb/backend/dm/Recover.java @@ -86,7 +86,7 @@ private static void redoTranscations(TransactionManager tm, Logger lg, PageCache InsertLogInfo li = parseInsertLog(log); long xid = li.xid; if(!tm.isActive(xid)) { - doInsertLog(pc, log, REDO); + doInsertLog(pc, li, REDO); } } else { UpdateLogInfo xi = parseUpdateLog(log); @@ -240,4 +240,27 @@ private static void doInsertLog(PageCache pc, byte[] log, int flag) { pg.release(); } } + + /* + * 重载doInsertLog(),减少log重复解析消耗 + * 作者:RioAngele + * 时间:2023.5.23 + */ + + private static void doInsertLog(PageCache pc,InsertLogInfo li , int flag) { + Page pg = null; + try { + pg = pc.getPage(li.pgno); + } catch(Exception e) { + Panic.panic(e); + } + try { + if(flag == UNDO) { + DataItem.setDataItemRawInvalid(li.raw); + } + PageX.recoverInsert(pg, li.raw, li.offset); + } finally { + pg.release(); + } + } } diff --git a/src/main/java/top/guoziyang/mydb/backend/dm/pageCache/PageCacheImpl.java b/src/main/java/top/guoziyang/mydb/backend/dm/pageCache/PageCacheImpl.java index 9e8a836..ca9d1a7 100644 --- a/src/main/java/top/guoziyang/mydb/backend/dm/pageCache/PageCacheImpl.java +++ b/src/main/java/top/guoziyang/mydb/backend/dm/pageCache/PageCacheImpl.java @@ -41,11 +41,14 @@ public class PageCacheImpl extends AbstractCache implements PageCache { this.fileLock = new ReentrantLock(); this.pageNumbers = new AtomicInteger((int)length / PAGE_SIZE); } - +/* + * 改用重载方法flush() + */ public int newPage(byte[] initData) { int pgno = pageNumbers.incrementAndGet(); - Page pg = new PageImpl(pgno, initData, null); - flush(pg); + // Page pg = new PageImpl(pgno, initData, null); + // flush(pg); + flush(pgno, initData); return pgno; } @@ -106,6 +109,28 @@ private void flush(Page pg) { } } + /* + * 重载 flush()方法 + * 作者:RioAngle + * 时间:2023.5.23 + */ + + private void flush(int pgno,byte[] data) { + long offset = pageOffset(pgno); + + fileLock.lock(); + try { + ByteBuffer buf = ByteBuffer.wrap(data); + fc.position(offset); + fc.write(buf); + fc.force(false); + } catch(IOException e) { + Panic.panic(e); + } finally { + fileLock.unlock(); + } + } + public void truncateByBgno(int maxPgno) { long size = pageOffset(maxPgno + 1); try { From 1fde5c461ff777b86fe0c41837bf2b85dec12645 Mon Sep 17 00:00:00 2001 From: RioAngele <1921292838@qq.com> Date: Mon, 29 May 2023 17:48:38 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20Netty=E5=AE=A2?= =?UTF-8?q?=E6=88=B7=E7=AB=AF=EF=BC=8C=E6=9C=8D=E5=8A=A1=E7=AB=AF=E7=9A=84?= =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E4=B8=8E=E5=8D=95=E5=85=83=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 10 ++- .../top/guoziyang/mydb/backend/Launcher.java | 11 ++- .../mydb/backend/server/NettyServer.java | 69 +++++++++++++++ .../backend/server/NettyServerHandler.java | 45 ++++++++++ .../top/guoziyang/mydb/client/Launcher.java | 19 +++-- .../guoziyang/mydb/client/NettyClient.java | 84 +++++++++++++++++++ .../mydb/client/NettyClientHandler.java | 24 ++++++ .../java/top/guoziyang/mydb/client/Shell.java | 10 ++- .../mydb/transport/DecoderHandler.java | 26 ++++++ .../mydb/transport/EncoderHandler.java | 19 +++++ .../guoziyang/mydb/transport/CodecTest.java | 45 ++++++++++ 11 files changed, 350 insertions(+), 12 deletions(-) create mode 100644 src/main/java/top/guoziyang/mydb/backend/server/NettyServer.java create mode 100644 src/main/java/top/guoziyang/mydb/backend/server/NettyServerHandler.java create mode 100644 src/main/java/top/guoziyang/mydb/client/NettyClient.java create mode 100644 src/main/java/top/guoziyang/mydb/client/NettyClientHandler.java create mode 100644 src/main/java/top/guoziyang/mydb/transport/DecoderHandler.java create mode 100644 src/main/java/top/guoziyang/mydb/transport/EncoderHandler.java create mode 100644 src/test/java/top/guoziyang/mydb/transport/CodecTest.java diff --git a/pom.xml b/pom.xml index f16996e..3c4887a 100644 --- a/pom.xml +++ b/pom.xml @@ -10,8 +10,8 @@ UTF-8 - 11 - 11 + 17 + 17 @@ -41,5 +41,11 @@ commons-cli 1.5.0 + + + io.netty + netty-all + 5.0.0.Alpha1 + diff --git a/src/main/java/top/guoziyang/mydb/backend/Launcher.java b/src/main/java/top/guoziyang/mydb/backend/Launcher.java index eb79e35..344f91e 100644 --- a/src/main/java/top/guoziyang/mydb/backend/Launcher.java +++ b/src/main/java/top/guoziyang/mydb/backend/Launcher.java @@ -7,6 +7,7 @@ import org.apache.commons.cli.ParseException; import top.guoziyang.mydb.backend.dm.DataManager; +import top.guoziyang.mydb.backend.server.NettyServer; import top.guoziyang.mydb.backend.server.Server; import top.guoziyang.mydb.backend.tbm.TableManager; import top.guoziyang.mydb.backend.tm.TransactionManager; @@ -17,7 +18,7 @@ public class Launcher { - public static final int port = 9999; + public static final int port = 7777; public static final long DEFALUT_MEM = (1<<20)*64; public static final long KB = 1 << 10; @@ -52,12 +53,18 @@ private static void createDB(String path) { dm.close(); } + + /** + * 改用Netty服务端 + * 作者:RioAngele + * 时间:2023.5.23 + */ private static void openDB(String path, long mem) { TransactionManager tm = TransactionManager.open(path); DataManager dm = DataManager.open(path, mem, tm); VersionManager vm = new VersionManagerImpl(tm, dm); TableManager tbm = TableManager.open(path, vm, dm); - new Server(port, tbm).start(); + new NettyServer(port, tbm).start(); } private static long parseMem(String memStr) { diff --git a/src/main/java/top/guoziyang/mydb/backend/server/NettyServer.java b/src/main/java/top/guoziyang/mydb/backend/server/NettyServer.java new file mode 100644 index 0000000..681d110 --- /dev/null +++ b/src/main/java/top/guoziyang/mydb/backend/server/NettyServer.java @@ -0,0 +1,69 @@ +package top.guoziyang.mydb.backend.server; + + + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import top.guoziyang.mydb.backend.tbm.TableManager; +import top.guoziyang.mydb.transport.DecoderHandler; +import top.guoziyang.mydb.transport.EncoderHandler; + + +/** + * netty服务端 + * 作者:RioAngele + * 时间:2023.5.23 + */ + +public class NettyServer { + + int PORT ; + TableManager tbm; + public NettyServer(int PORT, TableManager tbm){ + this.PORT=PORT; + this.tbm=tbm; + } + + + + public void start() { + + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 128) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline p = ch.pipeline(); + p.addLast(new EncoderHandler()) + .addLast(new DecoderHandler()) + .addLast(new NettyServerHandler(tbm)); + } + }); + + ChannelFuture f = b.bind(PORT).sync(); + f.channel().closeFuture().sync(); + } catch (InterruptedException e) { + e.printStackTrace(); + System.out.println("fail to start!!!"); + } finally { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } + + +} + diff --git a/src/main/java/top/guoziyang/mydb/backend/server/NettyServerHandler.java b/src/main/java/top/guoziyang/mydb/backend/server/NettyServerHandler.java new file mode 100644 index 0000000..738ff68 --- /dev/null +++ b/src/main/java/top/guoziyang/mydb/backend/server/NettyServerHandler.java @@ -0,0 +1,45 @@ +package top.guoziyang.mydb.backend.server; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import top.guoziyang.mydb.backend.tbm.TableManager; +import top.guoziyang.mydb.transport.Encoder; +import top.guoziyang.mydb.transport.Package; +import top.guoziyang.mydb.transport.Packager; + + +/** + * netty服务端Handler + * 作者:RioAngele + * 时间:2023.5.23 + */ +public class NettyServerHandler extends ChannelInboundHandlerAdapter { + private TableManager tbm; + + public NettyServerHandler(TableManager tbm) { + this.tbm = tbm; + } + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + Package pck=(Package) msg; + Executor exe = new Executor(tbm); + byte[] result = null; + Exception e = null; + try { + result = exe.execute(pck.getData()); + } catch (Exception e1) { + e = e1; + e.printStackTrace(); + exe.close(); + } + Package pkg=new Package(result,e); + ctx.writeAndFlush(pkg); + } +} diff --git a/src/main/java/top/guoziyang/mydb/client/Launcher.java b/src/main/java/top/guoziyang/mydb/client/Launcher.java index 86e1c97..f1f14f7 100644 --- a/src/main/java/top/guoziyang/mydb/client/Launcher.java +++ b/src/main/java/top/guoziyang/mydb/client/Launcher.java @@ -8,14 +8,21 @@ import top.guoziyang.mydb.transport.Packager; import top.guoziyang.mydb.transport.Transporter; + +/** +* 改用Netty客户端 +* 作者:RioAngele +* 时间:2023.5.23 +*/ public class Launcher { - public static void main(String[] args) throws UnknownHostException, IOException { - Socket socket = new Socket("127.0.0.1", 9999); - Encoder e = new Encoder(); - Transporter t = new Transporter(socket); - Packager packager = new Packager(t, e); + public static void main(String[] args) throws UnknownHostException, IOException, InterruptedException { +// Socket socket = new Socket("127.0.0.1", 9999); +// Encoder e = new Encoder(); +// Transporter t = new Transporter(socket); +// Packager packager = new Packager(t, e); - Client client = new Client(packager); +// Client client = new Client(packager); + NettyClient client=new NettyClient(); Shell shell = new Shell(client); shell.run(); } diff --git a/src/main/java/top/guoziyang/mydb/client/NettyClient.java b/src/main/java/top/guoziyang/mydb/client/NettyClient.java new file mode 100644 index 0000000..d3be99a --- /dev/null +++ b/src/main/java/top/guoziyang/mydb/client/NettyClient.java @@ -0,0 +1,84 @@ +package top.guoziyang.mydb.client; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import top.guoziyang.mydb.transport.DecoderHandler; +import top.guoziyang.mydb.transport.EncoderHandler; +import top.guoziyang.mydb.transport.Package; + + +import java.net.InetSocketAddress; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** +* Netty客户端 +* 作者:RioAngele +* 时间:2023.5.23 +*/ +public final class NettyClient { + + private final Bootstrap bootstrap; + private final EventLoopGroup eventLoopGroup; + public Channel channel; + public static CompletableFuture resultFuture; + + public NettyClient() throws InterruptedException { + CompletableFuture resultFuture= new CompletableFuture<>(); + eventLoopGroup = new NioEventLoopGroup(); + bootstrap = new Bootstrap(); + bootstrap.group(eventLoopGroup) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline p = ch.pipeline(); + p.addLast(new EncoderHandler()) + .addLast(new DecoderHandler()) + .addLast(new NettyClientHandler()); + } + }); + channel=bootstrap.connect("127.0.0.1",7777).sync().channel(); + } + + + + public byte[] execute(byte[] sh) throws Exception { + resultFuture=new CompletableFuture(); + Package pkg=new Package(sh,null); + if (channel.isActive()) { + channel.writeAndFlush(pkg).addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + System.out.println("success to send"); + } else { + System.out.println("fail to send"); + future.channel().close(); + } + }); + } else { + throw new IllegalStateException(); + } + Package resPkg=null; + while(!resultFuture.isDone()){ + + + } + resPkg=resultFuture.get(); + if(resPkg.getErr() != null) { + throw resPkg.getErr(); + } + return resPkg.getData(); + } + + public void close() { + eventLoopGroup.shutdownGracefully(); + } +} + diff --git a/src/main/java/top/guoziyang/mydb/client/NettyClientHandler.java b/src/main/java/top/guoziyang/mydb/client/NettyClientHandler.java new file mode 100644 index 0000000..f683e01 --- /dev/null +++ b/src/main/java/top/guoziyang/mydb/client/NettyClientHandler.java @@ -0,0 +1,24 @@ +package top.guoziyang.mydb.client; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import top.guoziyang.mydb.transport.Package; + + +/** +* 改用Netty客户端Handler +* 作者:RioAngele +* 时间:2023.5.23 +*/ +public class NettyClientHandler extends ChannelInboundHandlerAdapter { + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + NettyClient.resultFuture.complete((Package) msg); + } +} diff --git a/src/main/java/top/guoziyang/mydb/client/Shell.java b/src/main/java/top/guoziyang/mydb/client/Shell.java index 9e65179..cc66074 100644 --- a/src/main/java/top/guoziyang/mydb/client/Shell.java +++ b/src/main/java/top/guoziyang/mydb/client/Shell.java @@ -2,10 +2,16 @@ import java.util.Scanner; + +/** +* 改用Netty客户端 +* 作者:RioAngele +* 时间:2023.5.23 +*/ public class Shell { - private Client client; + private NettyClient client; - public Shell(Client client) { + public Shell(NettyClient client) { this.client = client; } diff --git a/src/main/java/top/guoziyang/mydb/transport/DecoderHandler.java b/src/main/java/top/guoziyang/mydb/transport/DecoderHandler.java new file mode 100644 index 0000000..8c708c9 --- /dev/null +++ b/src/main/java/top/guoziyang/mydb/transport/DecoderHandler.java @@ -0,0 +1,26 @@ +package top.guoziyang.mydb.transport; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; + +import java.util.List; + + +/** +* 解码Handler,将字节流解码为Package +* 作者:RioAngele +* 时间:2023.5.23 +*/ +public class DecoderHandler extends ByteToMessageDecoder{ + Encoder encoder=new Encoder(); + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + if(in.readableBytes()>0){ + byte[] by=new byte[in.readableBytes()]; + in.readBytes(by); + out.add(encoder.decode(by));} + } +} diff --git a/src/main/java/top/guoziyang/mydb/transport/EncoderHandler.java b/src/main/java/top/guoziyang/mydb/transport/EncoderHandler.java new file mode 100644 index 0000000..d1f438b --- /dev/null +++ b/src/main/java/top/guoziyang/mydb/transport/EncoderHandler.java @@ -0,0 +1,19 @@ +package top.guoziyang.mydb.transport; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + + +/** +* 编码Handler,将Package编码为字节流 +* 作者:RioAngele +* 时间:2023.5.23 +*/ +public class EncoderHandler extends MessageToByteEncoder { + Encoder encoder=new Encoder(); + @Override + protected void encode(ChannelHandlerContext ctx, Package msg, ByteBuf out) throws Exception { + out.writeBytes(encoder.encode(msg)); + } +} diff --git a/src/test/java/top/guoziyang/mydb/transport/CodecTest.java b/src/test/java/top/guoziyang/mydb/transport/CodecTest.java new file mode 100644 index 0000000..589888b --- /dev/null +++ b/src/test/java/top/guoziyang/mydb/transport/CodecTest.java @@ -0,0 +1,45 @@ +package top.guoziyang.mydb.transport; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; + + +/** +* 解码编码Handler的单元测试 +* 作者:RioAngele +* 时间:2023.5.29 +*/ +public class CodecTest { + @Test + public void decodeTest(){ + ByteBuf buf= Unpooled.buffer(); + buf.writeInt(1); + buf.writeBytes("create table".getBytes()); + ByteBuf input=buf.duplicate(); + EmbeddedChannel channel=new EmbeddedChannel(new DecoderHandler()); + channel.writeInbound(input); + assertTrue(channel.finish()); + + Package a=channel.readInbound(); + System.out.println(new String(a.getData())); + } + + @Test + public void encodeTest(){ + Package pck=new Package("create table".getBytes(),null); + + EmbeddedChannel channel=new EmbeddedChannel(new EncoderHandler()); + + channel.writeOutbound(pck); + assertTrue(channel.finish()); + + ByteBuf buf=channel.readOutbound(); + byte[] by=new byte[buf.readableBytes()]; + buf.readBytes(by); + System.out.println(new String(by)); + } +}