46#include <netinet/tcp.h>
47#include <sys/socket.h>
58#include "debug/DistEthernet.hh"
59#include "debug/DistEthernetCmd.hh"
62#if defined(__FreeBSD__)
63#include <netinet/in.h>
68#if defined(__APPLE__) || defined(__MACH__)
70#define MSG_NOSIGNAL SO_NOSIGPIPE
83 unsigned dist_rank,
unsigned dist_size,
87 DistIface(dist_rank, dist_size, sync_start, sync_repeat,
em, use_pseudo_op,
88 is_switch, num_nodes), serverName(server_name),
89 serverPort(server_port), isSwitch(is_switch), listening(false)
93 DPRINTF(DistEthernet,
"TCPIface(listen): Can't bind port %d\n",
104 for (
int i = 0;
i <
size;
i++) {
106 DPRINTF(DistEthernet,
"First connection, waiting for link info\n");
108 panic(
"Failed to receive link info");
118 panic(
"Socket already listening!");
120 struct sockaddr_in sockaddr;
123 fdStatic = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
126 sockaddr.sin_family = PF_INET;
127 sockaddr.sin_addr.s_addr = INADDR_ANY;
128 sockaddr.sin_port = htons(port);
130 memset(&sockaddr.sin_zero, 0,
sizeof(sockaddr.sin_zero));
131 ret = ::bind(
fdStatic, (
struct sockaddr *)&sockaddr,
sizeof (sockaddr));
134 if (ret == -1 && errno != EADDRINUSE)
135 panic(
"ListenSocket(listen): bind() failed!");
140 if (errno != EADDRINUSE)
141 panic(
"ListenSocket(listen): listen() failed!");
154 static unsigned cur_rank = 0;
155 static unsigned cur_id = 0;
163 return cn.first.rank == cur_rank;
165 assert(iface0 !=
nodes.end());
166 assert(iface0->first.distIfaceId == 0);
167 sock = iface0->second;
171 DPRINTF(DistEthernet,
"Next connection, waiting for link info\n");
173 panic(
"Failed to receive link info");
174 assert(
ni.rank == cur_rank);
175 assert(
ni.distIfaceId == cur_id);
177 inform(
"Link okay (iface:%d -> (node:%d, iface:%d))",
179 if (
ni.distIfaceId <
ni.distIfaceNum - 1) {
196 DPRINTF(DistEthernet,
"Connected, waiting for ack (distIfaceId:%d\n",
199 panic(
"Failed to receive ack");
210 struct sockaddr_in sockaddr;
211 socklen_t slen =
sizeof (sockaddr);
215 if (setsockopt(
sock, IPPROTO_TCP, TCP_NODELAY, (
char *)&
i,
217 warn(
"ListenSocket(accept): setsockopt() TCP_NODELAY failed!");
224 struct addrinfo addr_hint, *addr_results;
227 std::string port_str = std::to_string(
serverPort);
229 sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
230 panic_if(
sock < 0,
"socket() failed: %s", strerror(errno));
233 if (setsockopt(
sock, IPPROTO_TCP, TCP_NODELAY, (
char *)&fl,
sizeof(fl)) < 0)
234 warn(
"ConnectSocket(connect): setsockopt() TCP_NODELAY failed!");
236 bzero(&addr_hint,
sizeof(addr_hint));
237 addr_hint.ai_family = AF_INET;
238 addr_hint.ai_socktype = SOCK_STREAM;
239 addr_hint.ai_protocol = IPPROTO_TCP;
241 ret = getaddrinfo(
serverName.c_str(), port_str.c_str(),
242 &addr_hint, &addr_results);
243 panic_if(ret < 0,
"getaddrinf() failed: %s", strerror(errno));
245 DPRINTF(DistEthernet,
"Connecting to %s:%s\n",
248 ret =
::connect(
sock, (
struct sockaddr *)(addr_results->ai_addr),
249 addr_results->ai_addrlen);
250 panic_if(ret < 0,
"connect() failed: %s", strerror(errno));
252 freeaddrinfo(addr_results);
257 [[maybe_unused]]
int ret;
268 ret = ::send(
sock, buf, length, MSG_NOSIGNAL);
270 if (errno == ECONNRESET || errno == EPIPE) {
271 exitSimLoop(
"Message server closed connection, simulation "
274 panic(
"send() failed: %s", strerror(errno));
277 panic_if(ret != length,
"send() failed");
285 ret = ::recv(
sock, buf, length, MSG_WAITALL );
287 if (errno == ECONNRESET || errno == EPIPE)
288 inform(
"recv(): %s", strerror(errno));
290 panic(
"recv() failed: %s", strerror(errno));
291 }
else if (ret == 0) {
292 inform(
"recv(): Connection closed");
293 }
else if (ret != length)
294 panic(
"recv() failed");
296 return (ret == length);
309 DPRINTF(DistEthernetCmd,
"TCPIface::sendCmd() type: %d\n",
310 static_cast<int>(
header.msgType));
322 DPRINTF(DistEthernetCmd,
"TCPIface::recvHeader() type: %d ret: %d\n",
323 static_cast<int>(
header.msgType), ret);
330 packet = std::make_shared<EthPacketData>(
header.dataPacketLength);
332 panic_if(!ret,
"Error while reading socket");
333 packet->simLength =
header.simLength;
334 packet->length =
header.dataPacketLength;
Defines global host-dependent types: Counter, Tick, and (indirectly) {int,uint}{8,...
The interface class to talk to peer gem5 processes.
unsigned rank
The rank of this process among the gem5 peers.
unsigned distIfaceId
Unique id for the dist link.
static unsigned distIfaceNum
Number of DistIface objects (i.e.
unsigned size
The number of gem5 processes comprising this dist simulation.
bool recvHeader(Header &header) override
Receive a header (i.e.
void initTransport() override
Init hook for the underlaying transport.
static std::vector< int > sockRegistry
Storage for all opened sockets.
static std::vector< std::pair< NodeInfo, int > > nodes
void sendPacket(const Header &header, const EthPacketPtr &packet) override
Send out a data packet to the remote end.
bool recvTCP(int sock, void *buf, unsigned length)
Receive the next incoming message through a TCP stream socket.
void recvPacket(const Header &header, EthPacketPtr &packet) override
Receive a packet from the remote end.
TCPIface(std::string server_name, unsigned server_port, unsigned dist_rank, unsigned dist_size, Tick sync_start, Tick sync_repeat, EventManager *em, bool use_pseudo_op, bool is_switch, int num_nodes)
The ctor creates and connects the stream socket to the server.
int sock
The stream socket to connect to the server.
void establishConnection()
void sendCmd(const Header &header) override
Send out a control command to the remote end.
void sendTCP(int sock, const void *buf, unsigned length)
Send out a message through a TCP stream socket.
#define panic(...)
This implements a cprintf based panic() function.
#define panic_if(cond,...)
Conditional panic macro that checks the supplied condition and only panics if the condition is true a...
Copyright (c) 2024 - Pranith Kumar Copyright (c) 2020 Inria All rights reserved.
uint64_t Tick
Tick count type.
void exitSimLoop(const std::string &message, int exit_code, Tick when, Tick repeat, bool serialize)
Schedule an event to exit the simulation loop (returning to Python) at the end of the current cycle (...
std::shared_ptr< EthPacketData > EthPacketPtr
Compute node info and storage for the very first connection from each node (used by the switch)