Skip to content

Commit

Permalink
Add peer tcp unicast (#764)
Browse files Browse the repository at this point in the history
* feat: add tcp dummy functions

* feat: add unix listen_tcp

* fix: don't print error if connection is closed

* feat: add is_open arg for peer mode

* fix: listen tcp

* feat: add peer unicast

* refactor: _z_open

* fix: peer multicast test config

* fix: timing

* feat: switch from bool to enum

* feat: remove tcp peer timing
  • Loading branch information
jean-roland authored Oct 28, 2024
1 parent 232c445 commit 5dd1fcd
Show file tree
Hide file tree
Showing 11 changed files with 334 additions and 183 deletions.
3 changes: 1 addition & 2 deletions include/zenoh-pico/link/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
#include "zenoh-pico/config.h"
#include "zenoh-pico/link/link.h"

#if Z_FEATURE_LINK_TCP == 1
z_result_t _z_endpoint_tcp_valid(_z_endpoint_t *ep);
z_result_t _z_new_link_tcp(_z_link_t *zl, _z_endpoint_t *ep);
#endif

#if Z_FEATURE_LINK_UDP_UNICAST == 1
z_result_t _z_endpoint_udp_unicast_valid(_z_endpoint_t *ep);
z_result_t _z_new_link_udp_unicast(_z_link_t *zl, _z_endpoint_t ep);
Expand Down
7 changes: 6 additions & 1 deletion include/zenoh-pico/transport/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@
#include "zenoh-pico/link/manager.h"
#include "zenoh-pico/transport/transport.h"

z_result_t _z_new_transport(_z_transport_t *zt, _z_id_t *bs, _z_string_t *locator, z_whatami_t mode);
enum _z_peer_op_e {
_Z_PEER_OP_OPEN = 0,
_Z_PEER_OP_LISTEN = 1,
};

z_result_t _z_new_transport(_z_transport_t *zt, _z_id_t *bs, _z_string_t *locator, z_whatami_t mode, int peer_op);
void _z_free_transport(_z_transport_t **zt);

#endif /* INCLUDE_ZENOH_PICO_TRANSPORT_MANAGER_H */
2 changes: 1 addition & 1 deletion include/zenoh-pico/transport/unicast/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ z_result_t _z_unicast_transport_create(_z_transport_t *zt, _z_link_t *zl,
z_result_t _z_unicast_open_client(_z_transport_unicast_establish_param_t *param, const _z_link_t *zl,
const _z_id_t *local_zid);
z_result_t _z_unicast_open_peer(_z_transport_unicast_establish_param_t *param, const _z_link_t *zl,
const _z_id_t *local_zid);
const _z_id_t *local_zid, int peer_op);
z_result_t _z_unicast_send_close(_z_transport_unicast_t *ztu, uint8_t reason, bool link_only);
z_result_t _z_unicast_transport_close(_z_transport_unicast_t *ztu, uint8_t reason);
void _z_unicast_transport_clear(_z_transport_t *zt);
Expand Down
9 changes: 4 additions & 5 deletions src/link/link.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,10 @@ z_result_t _z_open_link(_z_link_t *zl, _z_string_t *locator) {
_z_endpoint_t ep;
ret = _z_endpoint_from_string(&ep, locator);
if (ret == _Z_RES_OK) {
// TODO[peer]: when peer unicast mode is supported, this must be revisited
// Create transport link
#if Z_FEATURE_LINK_TCP == 1
if (_z_endpoint_tcp_valid(&ep) == _Z_RES_OK) {
ret = _z_new_link_tcp(zl, &ep);
} else
#endif
#if Z_FEATURE_LINK_UDP_UNICAST == 1
if (_z_endpoint_udp_unicast_valid(&ep) == _Z_RES_OK) {
ret = _z_new_link_udp_unicast(zl, ep);
Expand Down Expand Up @@ -80,10 +77,12 @@ z_result_t _z_listen_link(_z_link_t *zl, _z_string_t *locator) {
_z_endpoint_t ep;
ret = _z_endpoint_from_string(&ep, locator);
if (ret == _Z_RES_OK) {
// TODO[peer]: when peer unicast mode is supported, this must be revisited
// Create transport link
if (_z_endpoint_tcp_valid(&ep) == _Z_RES_OK) {
ret = _z_new_link_tcp(zl, &ep);
} else
#if Z_FEATURE_LINK_UDP_MULTICAST == 1
if (_z_endpoint_udp_multicast_valid(&ep) == _Z_RES_OK) {
if (_z_endpoint_udp_multicast_valid(&ep) == _Z_RES_OK) {
ret = _z_new_link_udp_multicast(zl, ep);
} else
#endif
Expand Down
11 changes: 11 additions & 0 deletions src/link/unicast/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,15 @@ z_result_t _z_new_link_tcp(_z_link_t *zl, _z_endpoint_t *endpoint) {

return ret;
}
#else
z_result_t _z_endpoint_tcp_valid(_z_endpoint_t *endpoint) {
_ZP_UNUSED(endpoint);
return _Z_ERR_TRANSPORT_NOT_AVAILABLE;
}

z_result_t _z_new_link_tcp(_z_link_t *zl, _z_endpoint_t *endpoint) {
_ZP_UNUSED(zl);
_ZP_UNUSED(endpoint);
return _Z_ERR_TRANSPORT_NOT_AVAILABLE;
}
#endif
140 changes: 70 additions & 70 deletions src/net/session.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
#include "zenoh-pico/utils/logging.h"
#include "zenoh-pico/utils/uuid.h"

z_result_t __z_open_inner(_z_session_rc_t *zn, _z_string_t *locator, z_whatami_t mode) {
static z_result_t __z_open_inner(_z_session_rc_t *zn, _z_string_t *locator, z_whatami_t mode, int peer_op) {
z_result_t ret = _Z_RES_OK;

_z_id_t local_zid = _z_id_empty();
Expand All @@ -46,7 +46,7 @@ z_result_t __z_open_inner(_z_session_rc_t *zn, _z_string_t *locator, z_whatami_t
local_zid = _z_id_empty();
return ret;
}
ret = _z_new_transport(&_Z_RC_IN_VAL(zn)->_tp, &local_zid, locator, mode);
ret = _z_new_transport(&_Z_RC_IN_VAL(zn)->_tp, &local_zid, locator, mode, peer_op);
if (ret != _Z_RES_OK) {
local_zid = _z_id_empty();
return ret;
Expand All @@ -64,89 +64,89 @@ z_result_t _z_open(_z_session_rc_t *zn, _z_config_t *config) {
if (opt_as_str != NULL) {
_z_uuid_to_bytes(zid.id, opt_as_str);
}
if (config == NULL) {
_Z_ERROR("A valid config is missing.");
return _Z_ERR_GENERIC;
}
int peer_op = _Z_PEER_OP_LISTEN;
_z_string_svec_t locators = _z_string_svec_make(0);
char *connect = _z_config_get(config, Z_CONFIG_CONNECT_KEY);
char *listen = _z_config_get(config, Z_CONFIG_LISTEN_KEY);
if (connect == NULL && listen == NULL) { // Scout if peer is not configured
opt_as_str = _z_config_get(config, Z_CONFIG_SCOUTING_WHAT_KEY);
if (opt_as_str == NULL) {
opt_as_str = (char *)Z_CONFIG_SCOUTING_WHAT_DEFAULT;
}
z_what_t what = strtol(opt_as_str, NULL, 10);

if (config != NULL) {
_z_string_svec_t locators = _z_string_svec_make(0);
char *connect = _z_config_get(config, Z_CONFIG_CONNECT_KEY);
char *listen = _z_config_get(config, Z_CONFIG_LISTEN_KEY);
if (connect == NULL && listen == NULL) { // Scout if peer is not configured
opt_as_str = _z_config_get(config, Z_CONFIG_SCOUTING_WHAT_KEY);
if (opt_as_str == NULL) {
opt_as_str = (char *)Z_CONFIG_SCOUTING_WHAT_DEFAULT;
}
z_what_t what = strtol(opt_as_str, NULL, 10);

opt_as_str = _z_config_get(config, Z_CONFIG_MULTICAST_LOCATOR_KEY);
if (opt_as_str == NULL) {
opt_as_str = (char *)Z_CONFIG_MULTICAST_LOCATOR_DEFAULT;
}
_z_string_t mcast_locator = _z_string_alias_str(opt_as_str);
opt_as_str = _z_config_get(config, Z_CONFIG_MULTICAST_LOCATOR_KEY);
if (opt_as_str == NULL) {
opt_as_str = (char *)Z_CONFIG_MULTICAST_LOCATOR_DEFAULT;
}
_z_string_t mcast_locator = _z_string_alias_str(opt_as_str);

opt_as_str = _z_config_get(config, Z_CONFIG_SCOUTING_TIMEOUT_KEY);
if (opt_as_str == NULL) {
opt_as_str = (char *)Z_CONFIG_SCOUTING_TIMEOUT_DEFAULT;
}
uint32_t timeout = (uint32_t)strtoul(opt_as_str, NULL, 10);
opt_as_str = _z_config_get(config, Z_CONFIG_SCOUTING_TIMEOUT_KEY);
if (opt_as_str == NULL) {
opt_as_str = (char *)Z_CONFIG_SCOUTING_TIMEOUT_DEFAULT;
}
uint32_t timeout = (uint32_t)strtoul(opt_as_str, NULL, 10);

// Scout and return upon the first result
_z_hello_list_t *hellos = _z_scout_inner(what, zid, &mcast_locator, timeout, true);
if (hellos != NULL) {
_z_hello_t *hello = _z_hello_list_head(hellos);
_z_string_svec_copy(&locators, &hello->_locators);
// Scout and return upon the first result
_z_hello_list_t *hellos = _z_scout_inner(what, zid, &mcast_locator, timeout, true);
if (hellos != NULL) {
_z_hello_t *hello = _z_hello_list_head(hellos);
_z_string_svec_copy(&locators, &hello->_locators);
}
_z_hello_list_free(&hellos);
} else {
uint_fast8_t key = Z_CONFIG_CONNECT_KEY;
if (listen != NULL) {
if (connect == NULL) {
key = Z_CONFIG_LISTEN_KEY;
_zp_config_insert(config, Z_CONFIG_MODE_KEY, Z_CONFIG_MODE_PEER);
} else {
return _Z_ERR_GENERIC;
}
_z_hello_list_free(&hellos);
} else {
uint_fast8_t key = Z_CONFIG_CONNECT_KEY;
if (listen != NULL) {
if (connect == NULL) {
key = Z_CONFIG_LISTEN_KEY;
_zp_config_insert(config, Z_CONFIG_MODE_KEY, Z_CONFIG_MODE_PEER);
} else {
return _Z_ERR_GENERIC;
}
}
locators = _z_string_svec_make(1);
_z_string_t s = _z_string_copy_from_str(_z_config_get(config, key));
_z_string_svec_append(&locators, &s);
peer_op = _Z_PEER_OP_OPEN;
}
locators = _z_string_svec_make(1);
_z_string_t s = _z_string_copy_from_str(_z_config_get(config, key));
_z_string_svec_append(&locators, &s);
}

ret = _Z_ERR_SCOUT_NO_RESULTS;
size_t len = _z_string_svec_len(&locators);
for (size_t i = 0; i < len; i++) {
ret = _Z_RES_OK;
ret = _Z_ERR_SCOUT_NO_RESULTS;
size_t len = _z_string_svec_len(&locators);
for (size_t i = 0; i < len; i++) {
ret = _Z_RES_OK;

_z_string_t *locator = _z_string_svec_get(&locators, i);
// @TODO: check invalid configurations
// For example, client mode in multicast links
_z_string_t *locator = _z_string_svec_get(&locators, i);
// @TODO: check invalid configurations
// For example, client mode in multicast links

// Check operation mode
char *s_mode = _z_config_get(config, Z_CONFIG_MODE_KEY);
z_whatami_t mode = Z_WHATAMI_CLIENT; // By default, zenoh-pico will operate as a client
if (s_mode != NULL) {
if (_z_str_eq(s_mode, Z_CONFIG_MODE_CLIENT) == true) {
mode = Z_WHATAMI_CLIENT;
} else if (_z_str_eq(s_mode, Z_CONFIG_MODE_PEER) == true) {
mode = Z_WHATAMI_PEER;
} else {
ret = _Z_ERR_CONFIG_INVALID_MODE;
}
// Check operation mode
char *s_mode = _z_config_get(config, Z_CONFIG_MODE_KEY);
z_whatami_t mode = Z_WHATAMI_CLIENT; // By default, zenoh-pico will operate as a client
if (s_mode != NULL) {
if (_z_str_eq(s_mode, Z_CONFIG_MODE_CLIENT) == true) {
mode = Z_WHATAMI_CLIENT;
} else if (_z_str_eq(s_mode, Z_CONFIG_MODE_PEER) == true) {
mode = Z_WHATAMI_PEER;
} else {
ret = _Z_ERR_CONFIG_INVALID_MODE;
}
}

if (ret == _Z_RES_OK) {
ret = __z_open_inner(zn, locator, mode, peer_op);
if (ret == _Z_RES_OK) {
ret = __z_open_inner(zn, locator, mode);
if (ret == _Z_RES_OK) {
break;
}
} else {
_Z_ERROR("Trying to configure an invalid mode.");
break;
}
} else {
_Z_ERROR("Trying to configure an invalid mode.");
}
_z_string_svec_clear(&locators);
} else {
_Z_ERROR("A valid config is missing.");
ret = _Z_ERR_GENERIC;
}

_z_string_svec_clear(&locators);
return ret;
}

Expand Down
69 changes: 63 additions & 6 deletions src/system/unix/network.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,69 @@ z_result_t _z_open_tcp(_z_sys_net_socket_t *sock, const _z_sys_net_endpoint_t re

z_result_t _z_listen_tcp(_z_sys_net_socket_t *sock, const _z_sys_net_endpoint_t lep) {
z_result_t ret = _Z_RES_OK;
(void)sock;
(void)lep;

// @TODO: To be implemented
ret = _Z_ERR_GENERIC;

// Open socket
sock->_fd = socket(lep._iptcp->ai_family, lep._iptcp->ai_socktype, lep._iptcp->ai_protocol);
if (sock->_fd == -1) {
return _Z_ERR_GENERIC;
}
// Set options
int value = true;
if ((ret == _Z_RES_OK) && (setsockopt(sock->_fd, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value)) < 0)) {
ret = _Z_ERR_GENERIC;
}
int flags = 1;
if ((ret == _Z_RES_OK) && (setsockopt(sock->_fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)) < 0)) {
ret = _Z_ERR_GENERIC;
}
#if Z_FEATURE_TCP_NODELAY == 1
if ((ret == _Z_RES_OK) && (setsockopt(sock->_fd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)) < 0)) {
ret = _Z_ERR_GENERIC;
}
#endif
struct linger ling;
ling.l_onoff = 1;
ling.l_linger = Z_TRANSPORT_LEASE / 1000;
if ((ret == _Z_RES_OK) &&
(setsockopt(sock->_fd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(struct linger)) < 0)) {
ret = _Z_ERR_GENERIC;
}
#if defined(ZENOH_MACOS) || defined(ZENOH_BSD)
setsockopt(sock->_fd, SOL_SOCKET, SO_NOSIGPIPE, (void *)0, sizeof(int));
#endif
if (ret != _Z_RES_OK) {
close(sock->_fd);
return ret;
}
struct addrinfo *it = NULL;
for (it = lep._iptcp; it != NULL; it = it->ai_next) {
if (bind(sock->_fd, it->ai_addr, it->ai_addrlen) < 0) {
if (it->ai_next == NULL) {
ret = _Z_ERR_GENERIC;
break;
}
}
if (listen(sock->_fd, 1) < 0) {
if (it->ai_next == NULL) {
ret = _Z_ERR_GENERIC;
break;
}
}
struct sockaddr naddr;
unsigned int nlen = sizeof(naddr);
int con_socket = accept(sock->_fd, &naddr, &nlen);
if (con_socket < 0) {
if (it->ai_next == NULL) {
ret = _Z_ERR_GENERIC;
break;
}
} else {
sock->_fd = con_socket;
break;
}
}
if (ret != _Z_RES_OK) {
close(sock->_fd);
}
return ret;
}

Expand Down
16 changes: 10 additions & 6 deletions src/transport/manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include "zenoh-pico/transport/multicast/transport.h"
#include "zenoh-pico/transport/unicast/transport.h"

z_result_t _z_new_transport_client(_z_transport_t *zt, _z_string_t *locator, _z_id_t *local_zid) {
static z_result_t _z_new_transport_client(_z_transport_t *zt, _z_string_t *locator, _z_id_t *local_zid) {
z_result_t ret = _Z_RES_OK;
// Init link
_z_link_t zl;
Expand Down Expand Up @@ -62,20 +62,24 @@ z_result_t _z_new_transport_client(_z_transport_t *zt, _z_string_t *locator, _z_
return ret;
}

z_result_t _z_new_transport_peer(_z_transport_t *zt, _z_string_t *locator, _z_id_t *local_zid) {
static z_result_t _z_new_transport_peer(_z_transport_t *zt, _z_string_t *locator, _z_id_t *local_zid, int peer_op) {
z_result_t ret = _Z_RES_OK;
// Init link
_z_link_t zl;
memset(&zl, 0, sizeof(_z_link_t));
// Listen link
ret = _z_listen_link(&zl, locator);
if (peer_op == _Z_PEER_OP_OPEN) {
ret = _z_open_link(&zl, locator);
} else {
ret = _z_listen_link(&zl, locator);
}
if (ret != _Z_RES_OK) {
return ret;
}
switch (zl._cap._transport) {
case Z_LINK_CAP_TRANSPORT_UNICAST: {
_z_transport_unicast_establish_param_t tp_param;
ret = _z_unicast_open_peer(&tp_param, &zl, local_zid);
ret = _z_unicast_open_peer(&tp_param, &zl, local_zid, peer_op);
if (ret != _Z_RES_OK) {
_z_link_clear(&zl);
return ret;
Expand All @@ -101,13 +105,13 @@ z_result_t _z_new_transport_peer(_z_transport_t *zt, _z_string_t *locator, _z_id
return ret;
}

z_result_t _z_new_transport(_z_transport_t *zt, _z_id_t *bs, _z_string_t *locator, z_whatami_t mode) {
z_result_t _z_new_transport(_z_transport_t *zt, _z_id_t *bs, _z_string_t *locator, z_whatami_t mode, int peer_op) {
z_result_t ret;

if (mode == Z_WHATAMI_CLIENT) {
ret = _z_new_transport_client(zt, locator, bs);
} else {
ret = _z_new_transport_peer(zt, locator, bs);
ret = _z_new_transport_peer(zt, locator, bs, peer_op);
}

return ret;
Expand Down
4 changes: 3 additions & 1 deletion src/transport/unicast/read.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ void *_zp_unicast_read_task(void *ztu_arg) {
if (ret == _Z_RES_OK) {
_z_t_msg_clear(&t_msg);
} else {
_Z_ERROR("Connection closed due to message processing error: %d", ret);
if (ret != _Z_ERR_CONNECTION_CLOSED) {
_Z_ERROR("Connection closed due to message processing error: %d", ret);
}
ztu->_read_task_running = false;
continue;
}
Expand Down
Loading

0 comments on commit 5dd1fcd

Please sign in to comment.