49#include "debug/DistEthernet.hh"
50#include "debug/DistEthernetPkt.hh"
73 inform(
"Next dist synchronisation tick is changed to %lu.\n",
nextAt);
77 panic(
"Dist synchronisation interval must be greater than zero");
81 inform(
"Dist synchronisation interval is changed to %lu.\n",
89 std::unique_lock<std::mutex> sync_lock(
lock);
106 nextAt = std::numeric_limits<Tick>::max();
107 nextRepeat = std::numeric_limits<Tick>::max();
120 nextAt = std::numeric_limits<Tick>::max();
121 nextRepeat = std::numeric_limits<Tick>::max();
128 std::unique_lock<std::mutex> sync_lock(
lock);
135 header.msgType = MsgType::cmdSyncReq;
149 auto lf = [
this]{
return waitNum == 0; };
150 cv.wait(sync_lock, lf);
160 std::unique_lock<std::mutex> sync_lock(
lock);
164 auto lf = [
this]{
return waitNum == 0; };
165 cv.wait(sync_lock, lf);
173 header.msgType = MsgType::cmdSyncAck;
178 header.needCkpt = ReqType::immediate;
181 header.needCkpt = ReqType::none;
185 header.needExit = ReqType::immediate;
187 header.needExit = ReqType::none;
192 header.needStopSync = ReqType::immediate;
194 header.needStopSync = ReqType::none;
207 std::unique_lock<std::mutex> sync_lock(
lock);
217 if (need_ckpt == ReqType::collective)
219 else if (need_ckpt == ReqType::immediate)
221 if (need_exit == ReqType::collective)
223 else if (need_exit == ReqType::immediate)
225 if (need_stop_sync == ReqType::collective)
227 else if (need_stop_sync == ReqType::immediate)
248 std::unique_lock<std::mutex> sync_lock(
lock);
255 doCkpt = (do_ckpt != ReqType::none);
256 doExit = (do_exit != ReqType::none);
272 std::lock_guard<std::mutex> sync_lock(
lock);
273 assert(req != ReqType::none);
275 warn(
"Ckpt requested multiple times (req:%d)\n",
static_cast<int>(req));
276 if (
needCkpt == ReqType::none || req == ReqType::immediate)
283 std::lock_guard<std::mutex> sync_lock(
lock);
284 assert(req != ReqType::none);
286 warn(
"Exit requested multiple times (req:%d)\n",
static_cast<int>(req));
287 if (
needExit == ReqType::none || req == ReqType::immediate)
308 int need_exit =
static_cast<int>(
needExit);
344 panic(
"DistIface::SyncEvent::start() aborted\n");
362 inform(
"Dist sync scheduled at %lu and repeats %lu\n",
when(),
380 "Distributed sync is hit while draining");
415 for (
auto *tc:
primary->sys->threads) {
419 warn_once(
"Tried to wake up thread in dist-gem5, but it "
420 "was already awake!\n");
449 assert(recv_tick >= prev_recv_tick + send_delay);
450 panic_if(prev_recv_tick + send_delay > recv_tick,
451 "Receive window is smaller than send delay");
453 "Simulators out of sync - missed packet receive by %llu ticks"
454 "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu "
456 curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
479 d.sendDelay =
d.packet->simLength;
504 DPRINTF(DistEthernetPkt,
"DistIface::recvScheduler::pushPacket "
505 "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
506 send_tick, send_delay,
linkDelay, recv_tick);
508 assert(send_tick >
primary->syncEvent->when() -
520 descQueue.emplace(new_packet, send_tick, send_delay);
527 "Out of order packet received (recv_tick: %lu top(): %lu\n",
565 packet = std::make_shared<EthPacketData>();
566 packet->unserialize(
"rxPacket",
cp);
575 unsigned n_desc_queue =
descQueue.size();
576 assert(tmp_queue.size() ==
descQueue.size());
578 for (
int i = 0;
i < n_desc_queue;
i++) {
579 tmp_queue.front().serializeSection(
cp,
csprintf(
"rxDesc_%d",
i));
582 assert(tmp_queue.empty());
594 unsigned n_desc_queue;
596 for (
int i = 0;
i < n_desc_queue;
i++) {
610 bool is_switch,
int num_nodes) :
615 DPRINTF(DistEthernet,
"DistIface() ctor rank:%d\n",dist_rank);
618 assert(
sync ==
nullptr);
655 header.msgType = MsgType::dataDescriptor;
657 header.sendDelay = send_delay;
659 header.dataPacketLength = pkt->length;
660 header.simLength = pkt->simLength;
666 "DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
667 pkt->length, send_delay);
698 if (
header.msgType == MsgType::dataDescriptor) {
723 const_cast<Event *
>(recv_done),
731 DPRINTF(DistEthernet,
"DistIFace::drain() called\n");
740 DPRINTF(DistEthernet,
"DistIFace::drainResume() called\n");
752 sync->drainComplete();
761 sync->serializeSection(
cp,
"Sync");
768 unsigned rank_orig, dist_iface_id_orig;
772 panic_if(
rank != rank_orig,
"Rank mismatch at resume (rank=%d, orig=%d)",
775 "at resume (distIfaceId=%d, orig=%d)",
distIfaceId,
780 sync->unserializeSection(
cp,
"Sync");
801 assert(
sync !=
nullptr);
808 rng->init(5489 * (
rank+1) + 257);
814 DPRINTF(DistEthernet,
"DistIface::startup() started\n");
818 DPRINTF(DistEthernet,
"DistIface::startup() done\n");
825 DPRINTF(DistEthernet,
"DistIface::readyToCkpt() called, delay:%lu "
826 "period:%lu\n", delay, period);
829 inform(
"m5 checkpoint called with zero delay => triggering collaborative "
831 sync->requestCkpt(ReqType::collective);
833 inform(
"m5 checkpoint called with non-zero delay => triggering immediate "
834 "checkpoint (at the next sync)\n");
835 sync->requestCkpt(ReqType::immediate);
838 inform(
"Non-zero period for m5_ckpt is ignored in "
839 "distributed gem5 runs\n");
848 std::lock_guard<std::mutex> sync_lock(
lock);
862 if (
primary->syncEvent->scheduled()) {
863 inform(
"Request toggling syncronization off\n");
864 primary->sync->requestStopSync(ReqType::collective);
870 for (
auto *tc:
primary->sys->threads) {
875 inform(
"Request toggling syncronization on\n");
882 for (
auto *tc:
primary->sys->threads) {
893 DPRINTF(DistEthernet,
"DistIface::readyToExit() called, delay:%lu\n",
897 if (!
primary->syncEvent->scheduled())
901 inform(
"m5 exit called with zero delay => triggering collaborative "
903 sync->requestExit(ReqType::collective);
905 inform(
"m5 exit called with non-zero delay => triggering immediate "
906 "exit (at the next sync)\n");
907 sync->requestExit(ReqType::immediate);
921 warn(
"Dist-rank parameter is queried in single gem5 simulation.");
934 warn(
"Dist-size parameter is queried in single gem5 simulation.");
void reschedule(Tick when)
Tick prevRecvTick
The tick when the most recent receive event was processed.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
bool ckptRestore
Flag to set if receive ticks for pending packets need to be recalculated due to changed link latencie...
std::queue< Desc > descQueue
The queue to store the receive descriptors.
Tick linkDelay
The link delay in ticks for the simulated Ethernet link.
void serialize(CheckpointOut &cp) const override
Serialize an object.
void init(Event *recv_done, Tick link_delay)
Initialize network link parameters.
void resumeRecvTicks()
Adjust receive ticks for pending packets when restoring from a checkpoint.
EthPacketPtr popPacket()
Fetch the next packet that is to be received by the simulated network link.
void pushPacket(EthPacketPtr new_packet, Tick send_tick, Tick send_delay)
Push a newly arrived packet into the desc queue.
Tick calcReceiveTick(Tick send_tick, Tick send_delay, Tick prev_recv_tick)
Calculate the tick to schedule the next receive done event.
EventManager * eventManager
The event manager associated with the simulated Ethernet link.
Event * recvDone
The receive done event for the simulated Ethernet link.
The global event to schedule periodic dist sync.
bool _draining
Flag to set when the system is draining.
void start()
Schedule the first periodic sync event.
void process() override
This is a global event so process() will only be called by exactly one simulation thread.
void requestExit(ReqType req) override
void serialize(CheckpointOut &cp) const override
Serialize an object.
ReqType needStopSync
Sync stop requested.
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
void requestCkpt(ReqType req) override
bool run(bool same_tick) override
Core method to perform a full dist sync.
ReqType needCkpt
Ckpt requested.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
void requestStopSync(ReqType req) override
ReqType needExit
Exit requested.
bool run(bool same_tick) override
Core method to perform a full dist sync.
SyncSwitch(int num_nodes)
unsigned numExitReq
Counter for recording exit requests.
void serialize(CheckpointOut &cp) const override
Serialize an object.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
unsigned numCkptReq
Counter for recording ckpt requests.
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
unsigned numStopSyncReq
Counter for recording stop sync requests.
unsigned numNodes
Number of connected simulated nodes.
This class implements global sync operations among gem5 peer processes.
bool isAbort
Flag is set if the sync is aborted (e.g.
Tick nextRepeat
The repeat value for the next periodic sync.
bool doStopSync
Flag is set if sync is to stop upon sync completion.
unsigned waitNum
Number of receiver threads that not yet completed the current global synchronisation.
void init(Tick start, Tick repeat)
Initialize periodic sync params.
Tick nextAt
Tick for the next periodic sync (if the event is not scheduled yet)
bool doExit
Flag is set if exit is permitted upon sync completion.
virtual bool run(bool same_tick)=0
Core method to perform a full dist sync.
std::mutex lock
The lock to protect access to the Sync object.
std::condition_variable cv
Condition variable for the simulation thread to wait on until all receiver threads completes the curr...
void abort()
Abort processing an on-going sync event (in case of an error, e.g.
bool doCkpt
Flag is set if taking a ckpt is permitted upon sync completion.
The interface class to talk to peer gem5 processes.
static uint64_t rankParam()
Getter for the dist rank param.
virtual void initTransport()=0
Init hook for the underlaying transport.
RecvScheduler recvScheduler
Meta information about data packets received.
static bool readyToExit(Tick delay)
Initiate the exit from the simulation.
bool syncStartOnPseudoOp
Use pseudoOp to start synchronization.
static void toggleSync(ThreadContext *tc)
Trigger the primary to start/stop synchronization.
static bool readyToCkpt(Tick delay, Tick period)
Initiate taking a checkpoint.
void init(const Event *e, Tick link_delay)
unsigned rank
The rank of this process among the gem5 peers.
unsigned distIfaceId
Unique id for the dist link.
static DistIface * primary
The very first DistIface object created becomes the primary interface.
static uint64_t sizeParam()
Getter for the dist size param.
DrainState drain() override
Draining is the process of clearing out the states of SimObjects.These are the SimObjects that are pa...
std::thread * recvThread
Receiver thread pointer.
static SyncEvent * syncEvent
The singleton SyncEvent object to schedule periodic dist sync.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
static System * sys
System pointer used to wakeup sleeping threads when stopping sync.
void recvThreadFunc(Event *recv_done, Tick link_delay)
The function executed by a receiver thread.
virtual bool recvHeader(Header &header)=0
Receive a header (i.e.
static Sync * sync
The singleton Sync object to perform dist synchronisation.
virtual void sendPacket(const Header &header, const EthPacketPtr &packet)=0
Send out a data packet to the remote end.
static unsigned distIfaceNum
Number of DistIface objects (i.e.
void drainResume() override
Resume execution after a successful drain.
void spawnRecvThread(const Event *recv_done, Tick link_delay)
spawn the receiver thread.
static bool isSwitch
Is this node a switch?
void serialize(CheckpointOut &cp) const override
Serialize an object.
virtual void recvPacket(const Header &header, EthPacketPtr &packet)=0
Receive a packet from the remote end.
Tick syncRepeat
Frequency of dist sync events in ticks.
Tick syncStart
Tick to schedule the first dist sync event.
DistHeaderPkt::Header Header
static unsigned recvThreadsNum
Number of receiver threads (in this gem5 process)
DistIface(unsigned dist_rank, unsigned dist_size, Tick sync_start, Tick sync_repeat, EventManager *em, bool use_pseudo_op, bool is_switch, int num_nodes)
ctor
void packetOut(EthPacketPtr pkt, Tick send_delay)
Send out an Ethernet packet.
unsigned size
The number of gem5 processes comprising this dist simulation.
DistHeaderPkt::ReqType ReqType
void lock()
Provide an interface for locking/unlocking the event queue.
ThreadContext is the external interface to all thread state for anything outside of the CPU.
virtual System * getSystemPtr()=0
void quiesceTick(Tick resume)
Quiesce, suspend, and schedule activate at resume.
void quiesce()
Quiesce thread context.
@ Suspended
Temporarily inactive.
virtual Status status() const =0
DrainState
Object drain/handover states.
@ Drained
Buffers drained, ready for serialization/handover.
#define panic(...)
This implements a cprintf based panic() function.
#define panic_if(cond,...)
Conditional panic macro that checks the supplied condition and only panics if the condition is true a...
void unserializeSection(CheckpointIn &cp, const char *name)
Unserialize an a child object.
Copyright (c) 2024 Arm Limited All rights reserved.
Tick curTick()
The universal simulation clock.
std::ostream CheckpointOut
void exitSimLoop(const std::string &message, int exit_code, Tick when, Tick repeat, bool serialize)
The "old style" exitSimLoop functions.
uint64_t Tick
Tick count type.
std::string csprintf(const char *format, const Args &...args)
EventQueue * curEventQueue()
std::shared_ptr< EthPacketData > EthPacketPtr
#define UNSERIALIZE_SCALAR(scalar)
#define SERIALIZE_SCALAR(scalar)
Received packet descriptor.
void serialize(CheckpointOut &cp) const override
Serialize an object.
void unserialize(CheckpointIn &cp) override
Unserialize an object.