Skip to content

Commit

Permalink
Enable TCP_NODELAY, alternative approach
Browse files Browse the repository at this point in the history
This also overrides the ping rate limit on server when requested.
  • Loading branch information
edsko committed Jul 11, 2024
1 parent 978cba0 commit d6825a7
Show file tree
Hide file tree
Showing 8 changed files with 448 additions and 139 deletions.
10 changes: 7 additions & 3 deletions grapesy.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,16 @@ library
, hashable >= 1.3 && < 1.5
, http-types >= 0.12 && < 0.13
, http2 >= 5.2.4 && < 5.3
, http2-tls >= 0.2.11 && < 0.4
, http2-tls >= 0.4 && < 0.5
, lens >= 5.0 && < 5.4
, mtl >= 2.2 && < 2.4
, network >= 3.1 && < 3.3
, network-byte-order >= 0.1 && < 0.2
, network-run >= 0.2.7 && < 0.4
, network-run >= 0.4 && < 0.5
, proto-lens >= 0.7 && < 0.8
, proto-lens-runtime >= 0.7 && < 0.8
, random >= 1.2 && < 1.3
, record-hasfield >= 1.0 && < 1.1
, recv >= 0.1 && < 0.2
, stm >= 2.5 && < 2.6
, text >= 1.2 && < 2.2
, time-manager >= 0.1 && < 0.2
Expand Down Expand Up @@ -576,6 +576,10 @@ benchmark grapesy-kvstore
KVStore.Util.Store

Proto.Kvstore

Paths_grapesy
autogen-modules:
Paths_grapesy
build-depends:
grapesy
build-depends:
Expand Down
36 changes: 29 additions & 7 deletions kvstore/KVStore/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,12 @@ showStats Cmdline{cmdDuration} stats = unlines [
-- separate thread, and kill the thread after some amount of time. The number
-- of RPC calls made can then be read off from the 'IORef'.
client :: Cmdline -> IORef Stats -> IO ()
client Cmdline{cmdJSON} statsVar = do
client Cmdline{
cmdJSON
, cmdSecure
, cmdDisableTcpNoDelay
, cmdPingRateLimit
} statsVar = do
knownKeys <- RandomAccessSet.new
random <- RandomGen.new

Expand Down Expand Up @@ -120,14 +125,31 @@ client Cmdline{cmdJSON} statsVar = do
_ -> error "impossible"
where
params :: ConnParams
params = def
params = def {
connHTTP2Settings = def {
http2TcpNoDelay = not cmdDisableTcpNoDelay
, http2OverridePingRateLimit = cmdPingRateLimit
}
}

server :: Server
server = ServerInsecure $ Address {
addressHost = "127.0.0.1"
, addressPort = defaultInsecurePort
, addressAuthority = Nothing
}
server
| cmdSecure
= ServerSecure
NoServerValidation
SslKeyLogNone -- Let the server write the log
Address {
addressHost = "127.0.0.1"
, addressPort = defaultSecurePort
, addressAuthority = Nothing
}

| otherwise
= ServerInsecure $ Address {
addressHost = "127.0.0.1"
, addressPort = defaultInsecurePort
, addressAuthority = Nothing
}

{-------------------------------------------------------------------------------
Access the various server features
Expand Down
26 changes: 22 additions & 4 deletions kvstore/KVStore/Cmdline.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ import Options.Applicative qualified as Opt
-------------------------------------------------------------------------------}

data Cmdline = Cmdline {
cmdMode :: Mode
, cmdDuration :: Int
, cmdSimulateWork :: Bool
, cmdJSON :: Bool
cmdMode :: Mode
, cmdDuration :: Int
, cmdSimulateWork :: Bool
, cmdJSON :: Bool
, cmdSecure :: Bool
, cmdDisableTcpNoDelay :: Bool
, cmdPingRateLimit :: Maybe Int
}

data Mode =
Expand Down Expand Up @@ -59,6 +62,21 @@ parseCmdline =
Opt.long "json"
, Opt.help "Use JSON instead of Protobuf"
])
<*> (Opt.switch $ mconcat [
Opt.long "secure"
, Opt.help "Enable TLS"
])
<*> (Opt.switch $ mconcat [
Opt.long "disable-tcp-nodelay"
, Opt.help "Disable the TCP_NODELAY option"
])
<*> (Opt.optional $
Opt.option Opt.auto $ mconcat [
Opt.long "ping-rate-limit"
, Opt.metavar "PINGs/sec"
, Opt.help "Allow at most this many pings per second from the peer"
]
)

parseMode :: Parser Mode
parseMode = asum [
Expand Down
43 changes: 35 additions & 8 deletions kvstore/KVStore/Server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,42 @@ import KVStore.Cmdline
import KVStore.Util.Store (Store)
import KVStore.Util.Store qualified as Store

import Paths_grapesy

{-------------------------------------------------------------------------------
Server proper
-------------------------------------------------------------------------------}

withKeyValueServer :: Cmdline -> (RunningServer -> IO ()) -> IO ()
withKeyValueServer cmdline@Cmdline{cmdJSON} k = do
withKeyValueServer cmdline@Cmdline{
cmdJSON
, cmdSecure
, cmdDisableTcpNoDelay
, cmdPingRateLimit
} k = do
store <- Store.new

config :: ServerConfig <-
if cmdSecure then do
pub <- getDataFileName "grpc-demo.pem"
priv <- getDataFileName "grpc-demo.key"
return ServerConfig {
serverInsecure = Nothing
, serverSecure = Just $ SecureConfig {
secureHost = "0.0.0.0"
, securePort = defaultSecurePort
, securePubCert = pub
, secureChainCerts = []
, securePrivKey = priv
, secureSslKeyLog = SslKeyLogFromEnv
}
}
else
return ServerConfig {
serverInsecure = Just $ InsecureConfig Nothing defaultInsecurePort
, serverSecure = Nothing
}

let rpcHandlers :: [SomeRpcHandler IO]
rpcHandlers
| cmdJSON = JSON.server $ handlers cmdline store
Expand All @@ -31,19 +59,18 @@ withKeyValueServer cmdline@Cmdline{cmdJSON} k = do
server <- mkGrpcServer params rpcHandlers
forkServer params config server k
where
config :: ServerConfig
config = ServerConfig {
serverInsecure = Just $ InsecureConfig Nothing defaultInsecurePort
, serverSecure = Nothing
}

params :: ServerParams
params = def {
serverHTTP2Settings = def {
http2TcpNoDelay = not cmdDisableTcpNoDelay
, http2OverridePingRateLimit = cmdPingRateLimit
}

-- The Java benchmark does not use compression (unclear if the Java
-- implementation supports compression at all; the compression Interop
-- tests are also disabled for Java). For a fair comparison, we
-- therefore disable compression here also.
serverCompression = Compr.none
, serverCompression = Compr.none
}

{-------------------------------------------------------------------------------
Expand Down
61 changes: 41 additions & 20 deletions src/Network/GRPC/Client/Connection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ data ConnParams = ConnParams {
-- messages sent by the client to the server.
, connInitCompression :: Maybe Compression

-- | HTTP2 settings
, connHTTP2Settings :: HTTP2Settings
}

Expand Down Expand Up @@ -512,8 +513,11 @@ stayConnected connParams server connStateVar connOutOfScope =

-- | Insecure connection (no TLS)
connectInsecure :: ConnParams -> Attempt -> Address -> IO ()
connectInsecure connParams attempt addr =
runTCPClient addr $ \sock -> do
connectInsecure connParams attempt addr = do
Run.runTCPClientWithSettings
runSettings
(addressHost addr)
(show $ addressPort addr) $ \sock ->
bracket (HTTP2.Client.allocSimpleConfig sock writeBufferSize)
HTTP2.Client.freeSimpleConfig $ \conf ->
HTTP2.Client.run clientConfig conf $ \sendRequest _aux -> do
Expand All @@ -523,23 +527,31 @@ connectInsecure connParams attempt addr =
ConnectionReady (attemptClosed attempt) conn
takeMVar $ attemptOutOfScope attempt
where
ConnParams{connHTTP2Settings} = connParams

runSettings :: Run.Settings
runSettings = Run.defaultSettings {
Run.settingsOpenClientSocket = openClientSocket connHTTP2Settings
}

settings :: HTTP2.Client.Settings
settings = HTTP2.Client.defaultSettings {
HTTP2.Client.maxConcurrentStreams =
Just . fromIntegral $
http2MaxConcurrentStreams (connHTTP2Settings connParams)
http2MaxConcurrentStreams connHTTP2Settings
, HTTP2.Client.initialWindowSize =
fromIntegral $
http2StreamWindowSize (connHTTP2Settings connParams)
http2StreamWindowSize connHTTP2Settings
}

clientConfig :: HTTP2.Client.ClientConfig
clientConfig = overridePingRateLimit connParams $
HTTP2.Client.defaultClientConfig {
HTTP2.Client.authority = authority addr
, HTTP2.Client.settings = settings
, HTTP2.Client.connectionWindowSize =
fromIntegral $
http2ConnectionWindowSize (connHTTP2Settings connParams)
http2ConnectionWindowSize connHTTP2Settings
}

-- | Secure connection (using TLS)
Expand All @@ -560,19 +572,19 @@ connectSecure connParams attempt validation sslKeyLog addr = do
case validation of
ValidateServer _ -> True
NoServerValidation -> False
, HTTP2.TLS.Client.settingsCAStore = caStore
, HTTP2.TLS.Client.settingsKeyLogger = keyLogger
, HTTP2.TLS.Client.settingsAddrInfoFlags = []

, HTTP2.TLS.Client.settingsConcurrentStreams =
fromIntegral $
http2MaxConcurrentStreams (connHTTP2Settings connParams)
, HTTP2.TLS.Client.settingsStreamWindowSize =
fromIntegral $
http2StreamWindowSize (connHTTP2Settings connParams)
, HTTP2.TLS.Client.settingsConnectionWindowSize =
fromIntegral $
http2ConnectionWindowSize (connHTTP2Settings connParams)
, HTTP2.TLS.Client.settingsCAStore = caStore
, HTTP2.TLS.Client.settingsKeyLogger = keyLogger
, HTTP2.TLS.Client.settingsAddrInfoFlags = []

, HTTP2.TLS.Client.settingsOpenClientSocket =
openClientSocket connHTTP2Settings
, HTTP2.TLS.Client.settingsConcurrentStreams = fromIntegral $
http2MaxConcurrentStreams connHTTP2Settings
, HTTP2.TLS.Client.settingsStreamWindowSize = fromIntegral $
http2StreamWindowSize connHTTP2Settings
, HTTP2.TLS.Client.settingsConnectionWindowSize = fromIntegral $
http2ConnectionWindowSize connHTTP2Settings
}

clientConfig :: HTTP2.Client.ClientConfig
Expand All @@ -592,6 +604,8 @@ connectSecure connParams attempt validation sslKeyLog addr = do
writeTVar (attemptState attempt) $
ConnectionReady (attemptClosed attempt) conn
takeMVar $ attemptOutOfScope attempt
where
ConnParams{connHTTP2Settings} = connParams

-- | Authority
--
Expand Down Expand Up @@ -624,9 +638,16 @@ overridePingRateLimit connParams clientConfig = clientConfig {
Auxiliary http2
-------------------------------------------------------------------------------}

runTCPClient :: Address -> (Socket -> IO a) -> IO a
runTCPClient Address{addressHost, addressPort} =
Run.runTCPClient addressHost (show addressPort)
openClientSocket :: HTTP2Settings -> AddrInfo -> IO Socket
openClientSocket http2Settings =
Run.openClientSocketWithOptions socketOptions
where
socketOptions :: [(SocketOption, Int)]
socketOptions = concat [
[ (NoDelay, 1)
| http2TcpNoDelay http2Settings
]
]

-- | Write-buffer size
--
Expand Down
32 changes: 32 additions & 0 deletions src/Network/GRPC/Common/HTTP2Settings.hs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,37 @@ data HTTP2Settings = HTTP2Settings {
-- connecting to a peer that you trust, you can set this limit to
-- 'maxBound' (effectively turning off protecting against ping flooding).
, http2OverridePingRateLimit :: Maybe Int

-- | Enable @TCP_NODELAY@
--
-- Send out TCP segments as soon as possible, even if there is only a
-- small amount of data.
--
-- When @TCP_NODELAY@ is /NOT/ set, the TCP implementation will wait to
-- send a TCP segment to the receiving peer until either (1) there is
-- enough data to fill a certain minimum segment size or (2) we receive an
-- ACK from the receiving peer for data we sent previously. This adds a
-- network roundtrip delay to every RPC message we want to send (to
-- receive the ACK). If the peer uses TCP delayed acknowledgement, which
-- will typically be the case, then this delay will increase further
-- still; default for delayed acknowledgement is 40ms, thus resulting in a
-- theoretical maximum of 25 RPCs/sec.
--
-- We therefore enable TCP_NODELAY by default, so that data is sent to the
-- peer as soon as we have an entire gRPC message serialized and ready to
-- send (we send the data to the TCP layer only once an entire message is
-- written, or the @http2@ write buffer is full).
--
-- Turning this off /could/ improve throughput, as fewer TCP segments will
-- be needed, but you probably only want to do this if you send very few
-- very large RPC messages. In gRPC this is anyway discouraged, because
-- gRPC messages do not support incremental (de)serialization; if you need
-- to send large amounts of data, it is preferable to split these into
-- many, smaller, gRPC messages; this also gives the application the
-- possibility of reporting on data transmission progress.
--
-- TL;DR: leave this at the default unless you know what you are doing.
, http2TcpNoDelay :: Bool
}
deriving (Show)

Expand All @@ -82,6 +113,7 @@ defaultHTTP2Settings = HTTP2Settings {
, http2StreamWindowSize = defInitialStreamWindowSize
, http2ConnectionWindowSize = defMaxConcurrentStreams * defInitialStreamWindowSize
, http2OverridePingRateLimit = Nothing
, http2TcpNoDelay = True
}
where
defMaxConcurrentStreams = 128
Expand Down
Loading

0 comments on commit d6825a7

Please sign in to comment.