Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

实现了使用Netty代替Socket进行客户端服务端通信 #15

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
</properties>

<dependencies>
Expand Down Expand Up @@ -41,5 +41,11 @@
<artifactId>commons-cli</artifactId>
<version>1.5.0</version>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha1</version>
</dependency>
</dependencies>
</project>
11 changes: 9 additions & 2 deletions src/main/java/top/guoziyang/mydb/backend/Launcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.commons.cli.ParseException;

import top.guoziyang.mydb.backend.dm.DataManager;
import top.guoziyang.mydb.backend.server.NettyServer;
import top.guoziyang.mydb.backend.server.Server;
import top.guoziyang.mydb.backend.tbm.TableManager;
import top.guoziyang.mydb.backend.tm.TransactionManager;
Expand All @@ -17,7 +18,7 @@

public class Launcher {

public static final int port = 9999;
public static final int port = 7777;

public static final long DEFALUT_MEM = (1<<20)*64;
public static final long KB = 1 << 10;
Expand Down Expand Up @@ -52,12 +53,18 @@ private static void createDB(String path) {
dm.close();
}


/**
* 改用Netty服务端
* 作者:RioAngele
* 时间:2023.5.23
*/
private static void openDB(String path, long mem) {
TransactionManager tm = TransactionManager.open(path);
DataManager dm = DataManager.open(path, mem, tm);
VersionManager vm = new VersionManagerImpl(tm, dm);
TableManager tbm = TableManager.open(path, vm, dm);
new Server(port, tbm).start();
new NettyServer(port, tbm).start();
}

private static long parseMem(String memStr) {
Expand Down
25 changes: 24 additions & 1 deletion src/main/java/top/guoziyang/mydb/backend/dm/Recover.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private static void redoTranscations(TransactionManager tm, Logger lg, PageCache
InsertLogInfo li = parseInsertLog(log);
long xid = li.xid;
if(!tm.isActive(xid)) {
doInsertLog(pc, log, REDO);
doInsertLog(pc, li, REDO);
}
} else {
UpdateLogInfo xi = parseUpdateLog(log);
Expand Down Expand Up @@ -240,4 +240,27 @@ private static void doInsertLog(PageCache pc, byte[] log, int flag) {
pg.release();
}
}

/*
* 重载doInsertLog(),减少log重复解析消耗
* 作者:RioAngele
* 时间:2023.5.23
*/

private static void doInsertLog(PageCache pc,InsertLogInfo li , int flag) {
Page pg = null;
try {
pg = pc.getPage(li.pgno);
} catch(Exception e) {
Panic.panic(e);
}
try {
if(flag == UNDO) {
DataItem.setDataItemRawInvalid(li.raw);
}
PageX.recoverInsert(pg, li.raw, li.offset);
} finally {
pg.release();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@ public class PageCacheImpl extends AbstractCache<Page> implements PageCache {
this.fileLock = new ReentrantLock();
this.pageNumbers = new AtomicInteger((int)length / PAGE_SIZE);
}

/*
* 改用重载方法flush()
*/
public int newPage(byte[] initData) {
int pgno = pageNumbers.incrementAndGet();
Page pg = new PageImpl(pgno, initData, null);
flush(pg);
// Page pg = new PageImpl(pgno, initData, null);
// flush(pg);
flush(pgno, initData);
return pgno;
}

Expand Down Expand Up @@ -106,6 +109,28 @@ private void flush(Page pg) {
}
}

/*
* 重载 flush()方法
* 作者:RioAngle
* 时间:2023.5.23
*/

private void flush(int pgno,byte[] data) {
long offset = pageOffset(pgno);

fileLock.lock();
try {
ByteBuffer buf = ByteBuffer.wrap(data);
fc.position(offset);
fc.write(buf);
fc.force(false);
} catch(IOException e) {
Panic.panic(e);
} finally {
fileLock.unlock();
}
}

public void truncateByBgno(int maxPgno) {
long size = pageOffset(maxPgno + 1);
try {
Expand Down
69 changes: 69 additions & 0 deletions src/main/java/top/guoziyang/mydb/backend/server/NettyServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package top.guoziyang.mydb.backend.server;



import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import top.guoziyang.mydb.backend.tbm.TableManager;
import top.guoziyang.mydb.transport.DecoderHandler;
import top.guoziyang.mydb.transport.EncoderHandler;


/**
* netty服务端
* 作者:RioAngele
* 时间:2023.5.23
*/

public class NettyServer {

int PORT ;
TableManager tbm;
public NettyServer(int PORT, TableManager tbm){
this.PORT=PORT;
this.tbm=tbm;
}



public void start() {

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new EncoderHandler())
.addLast(new DecoderHandler())
.addLast(new NettyServerHandler(tbm));
}
});

ChannelFuture f = b.bind(PORT).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("fail to start!!!");
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}


}

Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package top.guoziyang.mydb.backend.server;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import top.guoziyang.mydb.backend.tbm.TableManager;
import top.guoziyang.mydb.transport.Encoder;
import top.guoziyang.mydb.transport.Package;
import top.guoziyang.mydb.transport.Packager;


/**
* netty服务端Handler
* 作者:RioAngele
* 时间:2023.5.23
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private TableManager tbm;

public NettyServerHandler(TableManager tbm) {
this.tbm = tbm;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Package pck=(Package) msg;
Executor exe = new Executor(tbm);
byte[] result = null;
Exception e = null;
try {
result = exe.execute(pck.getData());
} catch (Exception e1) {
e = e1;
e.printStackTrace();
exe.close();
}
Package pkg=new Package(result,e);
ctx.writeAndFlush(pkg);
}
}
19 changes: 13 additions & 6 deletions src/main/java/top/guoziyang/mydb/client/Launcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,21 @@
import top.guoziyang.mydb.transport.Packager;
import top.guoziyang.mydb.transport.Transporter;


/**
* 改用Netty客户端
* 作者:RioAngele
* 时间:2023.5.23
*/
public class Launcher {
public static void main(String[] args) throws UnknownHostException, IOException {
Socket socket = new Socket("127.0.0.1", 9999);
Encoder e = new Encoder();
Transporter t = new Transporter(socket);
Packager packager = new Packager(t, e);
public static void main(String[] args) throws UnknownHostException, IOException, InterruptedException {
// Socket socket = new Socket("127.0.0.1", 9999);
// Encoder e = new Encoder();
// Transporter t = new Transporter(socket);
// Packager packager = new Packager(t, e);

Client client = new Client(packager);
// Client client = new Client(packager);
NettyClient client=new NettyClient();
Shell shell = new Shell(client);
shell.run();
}
Expand Down
84 changes: 84 additions & 0 deletions src/main/java/top/guoziyang/mydb/client/NettyClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package top.guoziyang.mydb.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import top.guoziyang.mydb.transport.DecoderHandler;
import top.guoziyang.mydb.transport.EncoderHandler;
import top.guoziyang.mydb.transport.Package;


import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
* Netty客户端
* 作者:RioAngele
* 时间:2023.5.23
*/
public final class NettyClient {

private final Bootstrap bootstrap;
private final EventLoopGroup eventLoopGroup;
public Channel channel;
public static CompletableFuture<Package> resultFuture;

public NettyClient() throws InterruptedException {
CompletableFuture<byte[]> resultFuture= new CompletableFuture<>();
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new EncoderHandler())
.addLast(new DecoderHandler())
.addLast(new NettyClientHandler());
}
});
channel=bootstrap.connect("127.0.0.1",7777).sync().channel();
}



public byte[] execute(byte[] sh) throws Exception {
resultFuture=new CompletableFuture<Package>();
Package pkg=new Package(sh,null);
if (channel.isActive()) {
channel.writeAndFlush(pkg).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
System.out.println("success to send");
} else {
System.out.println("fail to send");
future.channel().close();
}
});
} else {
throw new IllegalStateException();
}
Package resPkg=null;
while(!resultFuture.isDone()){


}
resPkg=resultFuture.get();
if(resPkg.getErr() != null) {
throw resPkg.getErr();
}
return resPkg.getData();
}

public void close() {
eventLoopGroup.shutdownGracefully();
}
}

24 changes: 24 additions & 0 deletions src/main/java/top/guoziyang/mydb/client/NettyClientHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package top.guoziyang.mydb.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import top.guoziyang.mydb.transport.Package;


/**
* 改用Netty客户端Handler
* 作者:RioAngele
* 时间:2023.5.23
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyClient.resultFuture.complete((Package) msg);
}
}
Loading