46#include <netinet/tcp.h>
47#include <sys/socket.h>
58#include "debug/DistEthernet.hh"
59#include "debug/DistEthernetCmd.hh"
62#if defined(__FreeBSD__)
63#include <netinet/in.h>
68#if defined(__APPLE__) || defined(__MACH__)
70#define MSG_NOSIGNAL SO_NOSIGPIPE
83 unsigned dist_rank,
unsigned dist_size,
87 DistIface(dist_rank, dist_size, sync_start, sync_repeat,
em, use_pseudo_op,
88 is_switch, num_nodes),
serverName(server_name),
93 DPRINTF(DistEthernet,
"TCPIface(listen): Can't bind port %d\n",
104 for (
int i = 0;
i <
size;
i++) {
106 DPRINTF(DistEthernet,
"First connection, waiting for link info\n");
108 panic(
"Failed to receive link info");
118 panic(
"Socket already listening!");
120 struct sockaddr_in sockaddr;
123 fdStatic = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
126 sockaddr.sin_family = PF_INET;
127 sockaddr.sin_addr.s_addr = INADDR_ANY;
128 sockaddr.sin_port = htons(port);
130 memset(&sockaddr.sin_zero, 0,
sizeof(sockaddr.sin_zero));
131 ret = ::bind(
fdStatic, (
struct sockaddr *)&sockaddr,
sizeof (sockaddr));
134 if (ret == -1 && errno != EADDRINUSE)
135 panic(
"ListenSocket(listen): bind() failed!");
140 if (errno != EADDRINUSE)
141 panic(
"ListenSocket(listen): listen() failed!");
154 static unsigned cur_rank = 0;
155 static unsigned cur_id = 0;
163 return cn.first.rank == cur_rank;
165 assert(iface0 !=
nodes.end());
166 assert(iface0->first.distIfaceId == 0);
167 sock = iface0->second;
171 DPRINTF(DistEthernet,
"Next connection, waiting for link info\n");
173 panic(
"Failed to receive link info");
174 assert(
ni.rank == cur_rank);
175 assert(
ni.distIfaceId == cur_id);
177 inform(
"Link okay (iface:%d -> (node:%d, iface:%d))",
179 if (
ni.distIfaceId <
ni.distIfaceNum - 1) {
196 DPRINTF(DistEthernet,
"Connected, waiting for ack (distIfaceId:%d\n",
199 panic(
"Failed to receive ack");
210 struct sockaddr_in sockaddr;
211 socklen_t slen =
sizeof (sockaddr);
215 if (setsockopt(
sock, IPPROTO_TCP, TCP_NODELAY, (
char *)&
i,
217 warn(
"ListenSocket(accept): setsockopt() TCP_NODELAY failed!");
224 struct addrinfo addr_hint, *addr_results;
227 std::string port_str = std::to_string(
serverPort);
229 sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
230 panic_if(
sock < 0,
"socket() failed: %s", strerror(errno));
233 if (setsockopt(
sock, IPPROTO_TCP, TCP_NODELAY, (
char *)&fl,
sizeof(fl)) < 0)
234 warn(
"ConnectSocket(connect): setsockopt() TCP_NODELAY failed!");
236 bzero(&addr_hint,
sizeof(addr_hint));
237 addr_hint.ai_family = AF_INET;
238 addr_hint.ai_socktype = SOCK_STREAM;
239 addr_hint.ai_protocol = IPPROTO_TCP;
241 ret = getaddrinfo(
serverName.c_str(), port_str.c_str(),
242 &addr_hint, &addr_results);
243 panic_if(ret < 0,
"getaddrinf() failed: %s", strerror(errno));
245 DPRINTF(DistEthernet,
"Connecting to %s:%s\n",
248 ret =
::connect(
sock, (
struct sockaddr *)(addr_results->ai_addr),
249 addr_results->ai_addrlen);
250 panic_if(ret < 0,
"connect() failed: %s", strerror(errno));
252 freeaddrinfo(addr_results);
257 [[maybe_unused]]
int ret;
268 ret = ::send(
sock, buf, length, MSG_NOSIGNAL);
270 if (errno == ECONNRESET || errno == EPIPE) {
271 exitSimLoop(
"Message server closed connection, simulation "
274 panic(
"send() failed: %s", strerror(errno));
277 panic_if(ret != length,
"send() failed");
285 ret = ::recv(
sock, buf, length, MSG_WAITALL );
287 if (errno == ECONNRESET || errno == EPIPE)
288 inform(
"recv(): %s", strerror(errno));
290 panic(
"recv() failed: %s", strerror(errno));
291 }
else if (ret == 0) {
292 inform(
"recv(): Connection closed");
293 }
else if (ret != length)
294 panic(
"recv() failed");
296 return (ret == length);
309 DPRINTF(DistEthernetCmd,
"TCPIface::sendCmd() type: %d\n",
310 static_cast<int>(
header.msgType));
322 DPRINTF(DistEthernetCmd,
"TCPIface::recvHeader() type: %d ret: %d\n",
323 static_cast<int>(
header.msgType), ret);
330 packet = std::make_shared<EthPacketData>(
header.dataPacketLength);
332 panic_if(!ret,
"Error while reading socket");
333 packet->simLength =
header.simLength;
334 packet->length =
header.dataPacketLength;
Defines global host-dependent types: Counter, Tick, and (indirectly) {int,uint}{8,...
unsigned rank
The rank of this process among the gem5 peers.
unsigned distIfaceId
Unique id for the dist link.
static unsigned distIfaceNum
Number of DistIface objects (i.e.
DistHeaderPkt::Header Header
DistIface(unsigned dist_rank, unsigned dist_size, Tick sync_start, Tick sync_repeat, EventManager *em, bool use_pseudo_op, bool is_switch, int num_nodes)
ctor
unsigned size
The number of gem5 processes comprising this dist simulation.
bool recvHeader(Header &header) override
Receive a header (i.e.
void initTransport() override
Init hook for the underlaying transport.
static std::vector< int > sockRegistry
Storage for all opened sockets.
static std::vector< std::pair< NodeInfo, int > > nodes
void sendPacket(const Header &header, const EthPacketPtr &packet) override
Send out a data packet to the remote end.
bool recvTCP(int sock, void *buf, unsigned length)
Receive the next incoming message through a TCP stream socket.
void recvPacket(const Header &header, EthPacketPtr &packet) override
Receive a packet from the remote end.
TCPIface(std::string server_name, unsigned server_port, unsigned dist_rank, unsigned dist_size, Tick sync_start, Tick sync_repeat, EventManager *em, bool use_pseudo_op, bool is_switch, int num_nodes)
The ctor creates and connects the stream socket to the server.
int sock
The stream socket to connect to the server.
void establishConnection()
void sendCmd(const Header &header) override
Send out a control command to the remote end.
void sendTCP(int sock, const void *buf, unsigned length)
Send out a message through a TCP stream socket.
#define panic(...)
This implements a cprintf based panic() function.
#define panic_if(cond,...)
Conditional panic macro that checks the supplied condition and only panics if the condition is true a...
Copyright (c) 2024 Arm Limited All rights reserved.
void exitSimLoop(const std::string &message, int exit_code, Tick when, Tick repeat, bool serialize)
The "old style" exitSimLoop functions.
uint64_t Tick
Tick count type.
std::shared_ptr< EthPacketData > EthPacketPtr
Compute node info and storage for the very first connection from each node (used by the switch)