gem5 v24.1.0.1
Loading...
Searching...
No Matches
dist_iface.cc
Go to the documentation of this file.
1/*
2 * Copyright (c) 2015-2016 ARM Limited
3 * All rights reserved
4 *
5 * The license below extends only to copyright in the software and shall
6 * not be construed as granting a license to any other intellectual
7 * property including but not limited to intellectual property relating
8 * to a hardware implementation of the functionality of the software
9 * licensed hereunder. You may use the software subject to the license
10 * terms below provided that you ensure that this notice is replicated
11 * unmodified and in its entirety in all distributions of the software,
12 * modified or unmodified, in source code or in binary form.
13 *
14 * Redistribution and use in source and binary forms, with or without
15 * modification, are permitted provided that the following conditions are
16 * met: redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer;
18 * redistributions in binary form must reproduce the above copyright
19 * notice, this list of conditions and the following disclaimer in the
20 * documentation and/or other materials provided with the distribution;
21 * neither the name of the copyright holders nor the names of its
22 * contributors may be used to endorse or promote products derived from
23 * this software without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 */
37
38/* @file
39 * The interface class for dist-gem5 simulations.
40 */
41
42#include "dev/net/dist_iface.hh"
43
44#include <queue>
45#include <thread>
46
47#include "base/trace.hh"
48#include "cpu/thread_context.hh"
49#include "debug/DistEthernet.hh"
50#include "debug/DistEthernetPkt.hh"
51#include "dev/net/etherpkt.hh"
52#include "sim/cur_tick.hh"
53#include "sim/sim_exit.hh"
54#include "sim/sim_object.hh"
55#include "sim/system.hh"
56
57namespace gem5
58{
59
60DistIface::Sync *DistIface::sync = nullptr;
61System *DistIface::sys = nullptr;
62DistIface::SyncEvent *DistIface::syncEvent = nullptr;
63unsigned DistIface::distIfaceNum = 0;
64unsigned DistIface::recvThreadsNum = 0;
65DistIface *DistIface::primary = nullptr;
66bool DistIface::isSwitch = false;
67
68void
69DistIface::Sync::init(Tick start_tick, Tick repeat_tick)
70{
71 if (start_tick < nextAt) {
72 nextAt = start_tick;
73 inform("Next dist synchronisation tick is changed to %lu.\n", nextAt);
74 }
75
76 if (repeat_tick == 0)
77 panic("Dist synchronisation interval must be greater than zero");
78
79 if (repeat_tick < nextRepeat) {
80 nextRepeat = repeat_tick;
81 inform("Dist synchronisation interval is changed to %lu.\n",
83 }
84}
85
86void
88{
89 std::unique_lock<std::mutex> sync_lock(lock);
90 waitNum = 0;
91 isAbort = true;
92 sync_lock.unlock();
93 cv.notify_one();
94}
95
97{
98 numNodes = num_nodes;
99 waitNum = num_nodes;
100 numExitReq = 0;
101 numCkptReq = 0;
102 numStopSyncReq = 0;
103 doExit = false;
104 doCkpt = false;
105 doStopSync = false;
106 nextAt = std::numeric_limits<Tick>::max();
107 nextRepeat = std::numeric_limits<Tick>::max();
108 isAbort = false;
109}
110
112{
113 waitNum = 0;
114 needExit = ReqType::none;
115 needCkpt = ReqType::none;
116 needStopSync = ReqType::none;
117 doExit = false;
118 doCkpt = false;
119 doStopSync = false;
120 nextAt = std::numeric_limits<Tick>::max();
121 nextRepeat = std::numeric_limits<Tick>::max();
122 isAbort = false;
123}
124
125bool
127{
128 std::unique_lock<std::mutex> sync_lock(lock);
130
131 assert(waitNum == 0);
132 assert(!isAbort);
134 // initiate the global synchronisation
135 header.msgType = MsgType::cmdSyncReq;
136 header.sendTick = curTick();
137 header.syncRepeat = nextRepeat;
138 header.needCkpt = needCkpt;
139 header.needStopSync = needStopSync;
140 if (needCkpt != ReqType::none)
141 needCkpt = ReqType::pending;
142 header.needExit = needExit;
143 if (needExit != ReqType::none)
144 needExit = ReqType::pending;
145 if (needStopSync != ReqType::none)
146 needStopSync = ReqType::pending;
148 // now wait until all receiver threads complete the synchronisation
149 auto lf = [this]{ return waitNum == 0; };
150 cv.wait(sync_lock, lf);
151 // global synchronisation is done.
152 assert(isAbort || !same_tick || (nextAt == curTick()));
153 return !isAbort;
154}
155
156
157bool
159{
160 std::unique_lock<std::mutex> sync_lock(lock);
162 // Wait for the sync requests from the nodes
163 if (waitNum > 0) {
164 auto lf = [this]{ return waitNum == 0; };
165 cv.wait(sync_lock, lf);
166 }
167 assert(waitNum == 0);
168 if (isAbort) // sync aborted
169 return false;
170 assert(!same_tick || (nextAt == curTick()));
171 waitNum = numNodes;
172 // Complete the global synchronisation
173 header.msgType = MsgType::cmdSyncAck;
174 header.sendTick = nextAt;
175 header.syncRepeat = nextRepeat;
176 if (doCkpt || numCkptReq == numNodes) {
177 doCkpt = true;
178 header.needCkpt = ReqType::immediate;
179 numCkptReq = 0;
180 } else {
181 header.needCkpt = ReqType::none;
182 }
183 if (doExit || numExitReq == numNodes) {
184 doExit = true;
185 header.needExit = ReqType::immediate;
186 } else {
187 header.needExit = ReqType::none;
188 }
189 if (doStopSync || numStopSyncReq == numNodes) {
190 doStopSync = true;
191 numStopSyncReq = 0;
192 header.needStopSync = ReqType::immediate;
193 } else {
194 header.needStopSync = ReqType::none;
195 }
197 return true;
198}
199
200bool
202 Tick sync_repeat,
203 ReqType need_ckpt,
204 ReqType need_exit,
205 ReqType need_stop_sync)
206{
207 std::unique_lock<std::mutex> sync_lock(lock);
208 if (isAbort) // sync aborted
209 return false;
210 assert(waitNum > 0);
211
212 if (send_tick > nextAt)
213 nextAt = send_tick;
214 if (nextRepeat > sync_repeat)
215 nextRepeat = sync_repeat;
216
217 if (need_ckpt == ReqType::collective)
218 numCkptReq++;
219 else if (need_ckpt == ReqType::immediate)
220 doCkpt = true;
221 if (need_exit == ReqType::collective)
222 numExitReq++;
223 else if (need_exit == ReqType::immediate)
224 doExit = true;
225 if (need_stop_sync == ReqType::collective)
226 numStopSyncReq++;
227 else if (need_stop_sync == ReqType::immediate)
228 doStopSync = true;
229
230 waitNum--;
231 // Notify the simulation thread if the on-going sync is complete
232 if (waitNum == 0) {
233 sync_lock.unlock();
234 cv.notify_one();
235 }
236 // The receive thread must keep alive in the switch until the node
237 // closes the connection. Thus, we always return true here.
238 return true;
239}
240
241bool
243 Tick next_repeat,
244 ReqType do_ckpt,
245 ReqType do_exit,
246 ReqType do_stop_sync)
247{
248 std::unique_lock<std::mutex> sync_lock(lock);
249 if (isAbort) // sync aborted
250 return false;
251 assert(waitNum > 0);
252
253 nextAt = max_send_tick;
254 nextRepeat = next_repeat;
255 doCkpt = (do_ckpt != ReqType::none);
256 doExit = (do_exit != ReqType::none);
257 doStopSync = (do_stop_sync != ReqType::none);
258
259 waitNum--;
260 // Notify the simulation thread if the on-going sync is complete
261 if (waitNum == 0) {
262 sync_lock.unlock();
263 cv.notify_one();
264 }
265 // The receive thread must finish when simulation is about to exit
266 return !doExit;
267}
268
269void
271{
272 std::lock_guard<std::mutex> sync_lock(lock);
273 assert(req != ReqType::none);
274 if (needCkpt != ReqType::none)
275 warn("Ckpt requested multiple times (req:%d)\n", static_cast<int>(req));
276 if (needCkpt == ReqType::none || req == ReqType::immediate)
277 needCkpt = req;
278}
279
280void
282{
283 std::lock_guard<std::mutex> sync_lock(lock);
284 assert(req != ReqType::none);
285 if (needExit != ReqType::none)
286 warn("Exit requested multiple times (req:%d)\n", static_cast<int>(req));
287 if (needExit == ReqType::none || req == ReqType::immediate)
288 needExit = req;
289}
290
291void
293{
294 if (doCkpt) {
295 // The first DistIface object called this right before writing the
296 // checkpoint. We need to drain the underlying physical network here.
297 // Note that other gem5 peers may enter this barrier at different
298 // ticks due to draining.
299 run(false);
300 // Only the "first" DistIface object has to perform the sync
301 doCkpt = false;
302 }
303}
304
305void
307{
308 int need_exit = static_cast<int>(needExit);
309 SERIALIZE_SCALAR(need_exit);
310}
311
312void
314{
315 int need_exit;
316 UNSERIALIZE_SCALAR(need_exit);
317 needExit = static_cast<ReqType>(need_exit);
318}
319
320void
325
326void
331
332void
334{
335 // Note that this may be called either from startup() or drainResume()
336
337 // At this point, all DistIface objects has already called Sync::init() so
338 // we have a local minimum of the start tick and repeat for the periodic
339 // sync.
340 repeat = DistIface::sync->nextRepeat;
341 // Do a global barrier to agree on a common repeat value (the smallest
342 // one from all participating nodes.
343 if (!DistIface::sync->run(false))
344 panic("DistIface::SyncEvent::start() aborted\n");
345
346 assert(!DistIface::sync->doCkpt);
347 assert(!DistIface::sync->doExit);
348 assert(!DistIface::sync->doStopSync);
349 assert(DistIface::sync->nextAt >= curTick());
350 assert(DistIface::sync->nextRepeat <= repeat);
351
352 if (curTick() == 0)
353 assert(!scheduled());
354
355 // Use the maximum of the current tick for all participating nodes or a
356 // user provided starting tick.
357 if (scheduled())
358 reschedule(DistIface::sync->nextAt);
359 else
360 schedule(DistIface::sync->nextAt);
361
362 inform("Dist sync scheduled at %lu and repeats %lu\n", when(),
363 DistIface::sync->nextRepeat);
364}
365
366void
368{
369 // We may not start a global periodic sync while draining before taking a
370 // checkpoint. This is due to the possibility that peer gem5 processes
371 // may not hit the same periodic sync before they complete draining and
372 // that would make this periodic sync clash with sync called from
373 // DistIface::serialize() by other gem5 processes.
374 // We would need a 'distributed drain' solution to eliminate this
375 // restriction.
376 // Note that if draining was not triggered by checkpointing then we are
377 // fine since no extra global sync will happen (i.e. all peer gem5 will
378 // hit this periodic sync eventually).
379 panic_if(_draining && DistIface::sync->doCkpt,
380 "Distributed sync is hit while draining");
381 /*
382 * Note that this is a global event so this process method will be called
383 * by only exactly one thread.
384 */
385 /*
386 * We hold the eventq lock at this point but the receiver thread may
387 * need the lock to schedule new recv events while waiting for the
388 * dist sync to complete.
389 * Note that the other simulation threads also release their eventq
390 * locks while waiting for us due to the global event semantics.
391 */
392 {
394 // we do a global sync here that is supposed to happen at the same
395 // tick in all gem5 peers
396 if (!DistIface::sync->run(true))
397 return; // global sync aborted
398 // global sync completed
399 }
400 if (DistIface::sync->doCkpt)
401 exitSimLoop("checkpoint");
402 if (DistIface::sync->doExit) {
403 exitSimLoop("exit request from gem5 peers");
404 return;
405 }
406 if (DistIface::sync->doStopSync) {
408 inform("synchronization disabled at %lu\n", curTick());
409
410 // The switch node needs to wait for the next sync immediately.
412 start();
413 } else {
414 // Wake up thread contexts on non-switch nodes.
415 for (auto *tc: primary->sys->threads) {
416 if (tc->status() == ThreadContext::Suspended)
417 tc->activate();
418 else
419 warn_once("Tried to wake up thread in dist-gem5, but it "
420 "was already awake!\n");
421 }
422 }
423 return;
424 }
425 // schedule the next periodic sync
426 repeat = DistIface::sync->nextRepeat;
427 schedule(curTick() + repeat);
428}
429
430void
432{
433 // This is called from the receiver thread when it starts running. The new
434 // receiver thread shares the event queue with the simulation thread
435 // (associated with the simulated Ethernet link).
436 curEventQueue(eventManager->eventQueue());
437
438 recvDone = recv_done;
439 linkDelay = link_delay;
440}
441
442Tick
444 Tick send_delay,
445 Tick prev_recv_tick)
446{
447 Tick recv_tick = send_tick + send_delay + linkDelay;
448 // sanity check (we need atleast a send delay long window)
449 assert(recv_tick >= prev_recv_tick + send_delay);
450 panic_if(prev_recv_tick + send_delay > recv_tick,
451 "Receive window is smaller than send delay");
452 panic_if(recv_tick <= curTick(),
453 "Simulators out of sync - missed packet receive by %llu ticks"
454 "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu "
455 "linkDelay: %lu )",
456 curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
457 linkDelay);
458
459 return recv_tick;
460}
461
462void
464{
465 // Schedule pending packets asap in case link speed/delay changed when
466 // restoring from the checkpoint.
467 // This may be done during unserialize except that curTick() is unknown
468 // so we call this during drainResume().
469 // If we are not restoring from a checkppint then link latency could not
470 // change so we just return.
471 if (!ckptRestore)
472 return;
473
475 while (!descQueue.empty()) {
476 Desc d = descQueue.front();
477 descQueue.pop();
478 d.sendTick = curTick();
479 d.sendDelay = d.packet->simLength; // assume 1 tick/byte max link speed
480 v.push_back(d);
481 }
482
483 for (auto &d : v)
484 descQueue.push(d);
485
486 if (recvDone->scheduled()) {
487 assert(!descQueue.empty());
488 eventManager->reschedule(recvDone, curTick());
489 } else {
490 assert(descQueue.empty() && v.empty());
491 }
492 ckptRestore = false;
493}
494
495void
497 Tick send_tick,
498 Tick send_delay)
499{
500 // Note : this is called from the receiver thread
501 curEventQueue()->lock();
502 Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick);
503
504 DPRINTF(DistEthernetPkt, "DistIface::recvScheduler::pushPacket "
505 "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
506 send_tick, send_delay, linkDelay, recv_tick);
507 // Every packet must be sent and arrive in the same quantum
508 assert(send_tick > primary->syncEvent->when() -
510 // No packet may be scheduled for receive in the arrival quantum
511 assert(send_tick + send_delay + linkDelay > primary->syncEvent->when());
512
513 // Now we are about to schedule a recvDone event for the new data packet.
514 // We use the same recvDone object for all incoming data packets. Packet
515 // descriptors are saved in the ordered queue. The currently scheduled
516 // packet is always on the top of the queue.
517 // NOTE: we use the event queue lock to protect the receive desc queue,
518 // too, which is accessed both by the receiver thread and the simulation
519 // thread.
520 descQueue.emplace(new_packet, send_tick, send_delay);
521 if (descQueue.size() == 1) {
522 assert(!recvDone->scheduled());
523 eventManager->schedule(recvDone, recv_tick);
524 } else {
525 assert(recvDone->scheduled());
526 panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick,
527 "Out of order packet received (recv_tick: %lu top(): %lu\n",
528 recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay);
529 }
531}
532
535{
536 // Note : this is called from the simulation thread when a receive done
537 // event is being processed for the link. We assume that the thread holds
538 // the event queue queue lock when this is called!
539 EthPacketPtr next_packet = descQueue.front().packet;
540 descQueue.pop();
541
542 if (descQueue.size() > 0) {
543 Tick recv_tick = calcReceiveTick(descQueue.front().sendTick,
544 descQueue.front().sendDelay,
545 curTick());
546 eventManager->schedule(recvDone, recv_tick);
547 }
548 prevRecvTick = curTick();
549 return next_packet;
550}
551
552void
554{
555 SERIALIZE_SCALAR(sendTick);
556 SERIALIZE_SCALAR(sendDelay);
557 packet->serialize("rxPacket", cp);
558}
559
560void
562{
563 UNSERIALIZE_SCALAR(sendTick);
564 UNSERIALIZE_SCALAR(sendDelay);
565 packet = std::make_shared<EthPacketData>();
566 packet->unserialize("rxPacket", cp);
567}
568
569void
571{
572 SERIALIZE_SCALAR(prevRecvTick);
573 // serialize the receive desc queue
574 std::queue<Desc> tmp_queue(descQueue);
575 unsigned n_desc_queue = descQueue.size();
576 assert(tmp_queue.size() == descQueue.size());
577 SERIALIZE_SCALAR(n_desc_queue);
578 for (int i = 0; i < n_desc_queue; i++) {
579 tmp_queue.front().serializeSection(cp, csprintf("rxDesc_%d", i));
580 tmp_queue.pop();
581 }
582 assert(tmp_queue.empty());
583}
584
585void
587{
588 assert(descQueue.size() == 0);
589 assert(!recvDone->scheduled());
590 assert(!ckptRestore);
591
592 UNSERIALIZE_SCALAR(prevRecvTick);
593 // unserialize the receive desc queue
594 unsigned n_desc_queue;
595 UNSERIALIZE_SCALAR(n_desc_queue);
596 for (int i = 0; i < n_desc_queue; i++) {
597 Desc recv_desc;
598 recv_desc.unserializeSection(cp, csprintf("rxDesc_%d", i));
599 descQueue.push(recv_desc);
600 }
601 ckptRestore = true;
602}
603
604DistIface::DistIface(unsigned dist_rank,
605 unsigned dist_size,
606 Tick sync_start,
607 Tick sync_repeat,
609 bool use_pseudo_op,
610 bool is_switch, int num_nodes) :
611 syncStart(sync_start), syncRepeat(sync_repeat),
612 recvThread(nullptr), recvScheduler(em), syncStartOnPseudoOp(use_pseudo_op),
613 rank(dist_rank), size(dist_size)
614{
615 DPRINTF(DistEthernet, "DistIface() ctor rank:%d\n",dist_rank);
616 isPrimary = false;
617 if (primary == nullptr) {
618 assert(sync == nullptr);
619 assert(syncEvent == nullptr);
620 isSwitch = is_switch;
621 if (is_switch)
622 sync = new SyncSwitch(num_nodes);
623 else
624 sync = new SyncNode();
625 syncEvent = new SyncEvent();
626 primary = this;
627 isPrimary = true;
628 }
630 distIfaceNum++;
631}
632
634{
635 assert(recvThread);
636 recvThread->join();
637 delete recvThread;
638 if (distIfaceNum-- == 0) {
639 assert(syncEvent);
640 delete syncEvent;
641 assert(sync);
642 delete sync;
643 }
644 if (this == primary)
645 primary = nullptr;
646}
647
648void
650{
652
653 // Prepare a dist header packet for the Ethernet packet we want to
654 // send out.
655 header.msgType = MsgType::dataDescriptor;
656 header.sendTick = curTick();
657 header.sendDelay = send_delay;
658
659 header.dataPacketLength = pkt->length;
660 header.simLength = pkt->simLength;
661
662 // Send out the packet and the meta info.
663 sendPacket(header, pkt);
664
665 DPRINTF(DistEthernetPkt,
666 "DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
667 pkt->length, send_delay);
668}
669
670void
671DistIface::recvThreadFunc(Event *recv_done, Tick link_delay)
672{
673 EthPacketPtr new_packet;
675
676 // Initialize receive scheduler parameters
677 recvScheduler.init(recv_done, link_delay);
678
679 // Main loop to wait for and process any incoming message.
680 for (;;) {
681 // recvHeader() blocks until the next dist header packet comes in.
682 if (!recvHeader(header)) {
683 // We lost connection to the peer gem5 processes most likely
684 // because one of them called m5 exit. So we stop here.
685 // Grab the eventq lock to stop the simulation thread
686 curEventQueue()->lock();
687 exitSimLoop("connection to gem5 peer got closed");
689 // The simulation thread may be blocked in processing an on-going
690 // global synchronisation. Abort the sync to give the simulation
691 // thread a chance to make progress and process the exit event.
692 sync->abort();
693 // Finish receiver thread
694 break;
695 }
696
697 // We got a valid dist header packet, let's process it
698 if (header.msgType == MsgType::dataDescriptor) {
699 recvPacket(header, new_packet);
700 recvScheduler.pushPacket(new_packet,
701 header.sendTick,
702 header.sendDelay);
703 } else {
704 // everything else must be synchronisation related command
705 if (!sync->progress(header.sendTick,
706 header.syncRepeat,
707 header.needCkpt,
708 header.needExit,
709 header.needStopSync))
710 // Finish receiver thread if simulation is about to exit
711 break;
712 }
713 }
714}
715
716void
717DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay)
718{
719 assert(recvThread == nullptr);
720
721 recvThread = new std::thread(&DistIface::recvThreadFunc,
722 this,
723 const_cast<Event *>(recv_done),
724 link_delay);
726}
727
730{
731 DPRINTF(DistEthernet,"DistIFace::drain() called\n");
732 // This can be called multiple times in the same drain cycle.
733 if (this == primary)
734 syncEvent->draining(true);
735 return DrainState::Drained;
736}
737
738void
740 DPRINTF(DistEthernet,"DistIFace::drainResume() called\n");
741 if (this == primary)
742 syncEvent->draining(false);
744}
745
746void
748{
749 // Drain the dist interface before the checkpoint is taken. We cannot call
750 // this as part of the normal drain cycle because this dist sync has to be
751 // called exactly once after the system is fully drained.
753
754 unsigned rank_orig = rank, dist_iface_id_orig = distIfaceId;
755
756 SERIALIZE_SCALAR(rank_orig);
757 SERIALIZE_SCALAR(dist_iface_id_orig);
758
759 recvScheduler.serializeSection(cp, "recvScheduler");
760 if (this == primary) {
761 sync->serializeSection(cp, "Sync");
762 }
763}
764
765void
767{
768 unsigned rank_orig, dist_iface_id_orig;
769 UNSERIALIZE_SCALAR(rank_orig);
770 UNSERIALIZE_SCALAR(dist_iface_id_orig);
771
772 panic_if(rank != rank_orig, "Rank mismatch at resume (rank=%d, orig=%d)",
773 rank, rank_orig);
774 panic_if(distIfaceId != dist_iface_id_orig, "Dist iface ID mismatch "
775 "at resume (distIfaceId=%d, orig=%d)", distIfaceId,
776 dist_iface_id_orig);
777
778 recvScheduler.unserializeSection(cp, "recvScheduler");
779 if (this == primary) {
780 sync->unserializeSection(cp, "Sync");
781 }
782}
783
784void
785DistIface::init(const Event *done_event, Tick link_delay)
786{
787 // Init hook for the underlaying message transport to setup/finalize
788 // communication channels
790
791 // Spawn a new receiver thread that will process messages
792 // coming in from peer gem5 processes.
793 // The receive thread will also schedule a (receive) doneEvent
794 // for each incoming data packet.
795 spawnRecvThread(done_event, link_delay);
796
797
798 // Adjust the periodic sync start and interval. Different DistIface
799 // might have different requirements. The singleton sync object
800 // will select the minimum values for both params.
801 assert(sync != nullptr);
803
804 // Initialize the seed for random generator to avoid the same sequence
805 // in all gem5 peer processes
806 assert(primary != nullptr);
807 if (this == primary)
808 rng->init(5489 * (rank+1) + 257);
809}
810
811void
813{
814 DPRINTF(DistEthernet, "DistIface::startup() started\n");
815 // Schedule synchronization unless we are not a switch in pseudo_op mode.
816 if (this == primary && (!syncStartOnPseudoOp || isSwitch))
817 syncEvent->start();
818 DPRINTF(DistEthernet, "DistIface::startup() done\n");
819}
820
821bool
823{
824 bool ret = true;
825 DPRINTF(DistEthernet, "DistIface::readyToCkpt() called, delay:%lu "
826 "period:%lu\n", delay, period);
827 if (primary) {
828 if (delay == 0) {
829 inform("m5 checkpoint called with zero delay => triggering collaborative "
830 "checkpoint\n");
831 sync->requestCkpt(ReqType::collective);
832 } else {
833 inform("m5 checkpoint called with non-zero delay => triggering immediate "
834 "checkpoint (at the next sync)\n");
835 sync->requestCkpt(ReqType::immediate);
836 }
837 if (period != 0)
838 inform("Non-zero period for m5_ckpt is ignored in "
839 "distributed gem5 runs\n");
840 ret = false;
841 }
842 return ret;
843}
844
845void
847{
848 std::lock_guard<std::mutex> sync_lock(lock);
849 needStopSync = req;
850}
851
852void
854{
855 // Unforunate that we have to populate the system pointer member this way.
856 primary->sys = tc->getSystemPtr();
857
858 // The invariant for both syncing and "unsyncing" is that all threads will
859 // stop executing intructions until the desired sync state has been reached
860 // for all nodes. This is the easiest way to prevent deadlock (in the case
861 // of "unsyncing") and causality errors (in the case of syncing).
862 if (primary->syncEvent->scheduled()) {
863 inform("Request toggling syncronization off\n");
864 primary->sync->requestStopSync(ReqType::collective);
865
866 // At this point, we have no clue when everyone will reach the sync
867 // stop point. Suspend execution of all local thread contexts.
868 // Dist-gem5 will reactivate all thread contexts when everyone has
869 // reached the sync stop point.
870 for (auto *tc: primary->sys->threads) {
871 if (tc->status() == ThreadContext::Active)
872 tc->quiesce();
873 }
874 } else {
875 inform("Request toggling syncronization on\n");
877
878 // We need to suspend all CPUs until the sync point is reached by all
879 // nodes to prevent causality errors. We can also schedule CPU
880 // activation here, since we know exactly when the next sync will
881 // occur.
882 for (auto *tc: primary->sys->threads) {
883 if (tc->status() == ThreadContext::Active)
884 tc->quiesceTick(primary->syncEvent->when() + 1);
885 }
886 }
887}
888
889bool
891{
892 bool ret = true;
893 DPRINTF(DistEthernet, "DistIface::readyToExit() called, delay:%lu\n",
894 delay);
895 if (primary) {
896 // To successfully coordinate an exit, all nodes must be synchronising
897 if (!primary->syncEvent->scheduled())
899
900 if (delay == 0) {
901 inform("m5 exit called with zero delay => triggering collaborative "
902 "exit\n");
903 sync->requestExit(ReqType::collective);
904 } else {
905 inform("m5 exit called with non-zero delay => triggering immediate "
906 "exit (at the next sync)\n");
907 sync->requestExit(ReqType::immediate);
908 }
909 ret = false;
910 }
911 return ret;
912}
913
914uint64_t
916{
917 uint64_t val;
918 if (primary) {
919 val = primary->rank;
920 } else {
921 warn("Dist-rank parameter is queried in single gem5 simulation.");
922 val = 0;
923 }
924 return val;
925}
926
927uint64_t
929{
930 uint64_t val;
931 if (primary) {
932 val = primary->size;
933 } else {
934 warn("Dist-size parameter is queried in single gem5 simulation.");
935 val = 1;
936 }
937 return val;
938}
939
940} // namespace gem5
#define DPRINTF(x,...)
Definition trace.hh:209
void unserialize(CheckpointIn &cp) override
Unserialize an object.
void serialize(CheckpointOut &cp) const override
Serialize an object.
void init(Event *recv_done, Tick link_delay)
Initialize network link parameters.
void resumeRecvTicks()
Adjust receive ticks for pending packets when restoring from a checkpoint.
EthPacketPtr popPacket()
Fetch the next packet that is to be received by the simulated network link.
void pushPacket(EthPacketPtr new_packet, Tick send_tick, Tick send_delay)
Push a newly arrived packet into the desc queue.
Tick calcReceiveTick(Tick send_tick, Tick send_delay, Tick prev_recv_tick)
Calculate the tick to schedule the next receive done event.
The global event to schedule periodic dist sync.
void start()
Schedule the first periodic sync event.
void process() override
This is a global event so process() will only be called by exactly one simulation thread.
void requestExit(ReqType req) override
void serialize(CheckpointOut &cp) const override
Serialize an object.
ReqType needStopSync
Sync stop requested.
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
void requestCkpt(ReqType req) override
bool run(bool same_tick) override
Core method to perform a full dist sync.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
void requestStopSync(ReqType req) override
bool run(bool same_tick) override
Core method to perform a full dist sync.
void serialize(CheckpointOut &cp) const override
Serialize an object.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
Tick nextRepeat
The repeat value for the next periodic sync.
bool doStopSync
Flag is set if sync is to stop upon sync completion.
void init(Tick start, Tick repeat)
Initialize periodic sync params.
Definition dist_iface.cc:69
Tick nextAt
Tick for the next periodic sync (if the event is not scheduled yet)
virtual void requestStopSync(ReqType req)=0
virtual void requestExit(ReqType req)=0
virtual void requestCkpt(ReqType req)=0
std::mutex lock
The lock to protect access to the Sync object.
virtual bool progress(Tick send_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync)=0
Callback when the receiver thread gets a sync ack message.
void abort()
Abort processing an on-going sync event (in case of an error, e.g.
Definition dist_iface.cc:87
static uint64_t rankParam()
Getter for the dist rank param.
virtual void initTransport()=0
Init hook for the underlaying transport.
RecvScheduler recvScheduler
Meta information about data packets received.
static bool readyToExit(Tick delay)
Initiate the exit from the simulation.
bool syncStartOnPseudoOp
Use pseudoOp to start synchronization.
static void toggleSync(ThreadContext *tc)
Trigger the primary to start/stop synchronization.
static bool readyToCkpt(Tick delay, Tick period)
Initiate taking a checkpoint.
void init(const Event *e, Tick link_delay)
unsigned rank
The rank of this process among the gem5 peers.
unsigned distIfaceId
Unique id for the dist link.
static DistIface * primary
The very first DistIface object created becomes the primary interface.
Random::RandomPtr rng
static uint64_t sizeParam()
Getter for the dist size param.
DrainState drain() override
Draining is the process of clearing out the states of SimObjects.These are the SimObjects that are pa...
std::thread * recvThread
Receiver thread pointer.
static SyncEvent * syncEvent
The singleton SyncEvent object to schedule periodic dist sync.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
static System * sys
System pointer used to wakeup sleeping threads when stopping sync.
void recvThreadFunc(Event *recv_done, Tick link_delay)
The function executed by a receiver thread.
virtual void sendCmd(const Header &header)=0
Send out a control command to the remote end.
virtual bool recvHeader(Header &header)=0
Receive a header (i.e.
static Sync * sync
The singleton Sync object to perform dist synchronisation.
virtual void sendPacket(const Header &header, const EthPacketPtr &packet)=0
Send out a data packet to the remote end.
static unsigned distIfaceNum
Number of DistIface objects (i.e.
void drainResume() override
Resume execution after a successful drain.
virtual ~DistIface()
void spawnRecvThread(const Event *recv_done, Tick link_delay)
spawn the receiver thread.
static bool isSwitch
Is this node a switch?
void serialize(CheckpointOut &cp) const override
Serialize an object.
virtual void recvPacket(const Header &header, EthPacketPtr &packet)=0
Receive a packet from the remote end.
Tick syncRepeat
Frequency of dist sync events in ticks.
Tick syncStart
Tick to schedule the first dist sync event.
static unsigned recvThreadsNum
Number of receiver threads (in this gem5 process)
DistIface(unsigned dist_rank, unsigned dist_size, Tick sync_start, Tick sync_repeat, EventManager *em, bool use_pseudo_op, bool is_switch, int num_nodes)
ctor
void packetOut(EthPacketPtr pkt, Tick send_delay)
Send out an Ethernet packet.
unsigned size
The number of gem5 processes comprising this dist simulation.
void lock()
Provide an interface for locking/unlocking the event queue.
Definition eventq.hh:947
Threads threads
Definition system.hh:310
ThreadContext is the external interface to all thread state for anything outside of the CPU.
virtual System * getSystemPtr()=0
void quiesceTick(Tick resume)
Quiesce, suspend, and schedule activate at resume.
void quiesce()
Quiesce thread context.
@ Suspended
Temporarily inactive.
virtual Status status() const =0
STL vector class.
Definition stl.hh:37
DrainState
Object drain/handover states.
Definition drain.hh:75
@ Drained
Buffers drained, ready for serialization/handover.
#define panic(...)
This implements a cprintf based panic() function.
Definition logging.hh:188
#define panic_if(cond,...)
Conditional panic macro that checks the supplied condition and only panics if the condition is true a...
Definition logging.hh:214
void serializeSection(CheckpointOut &cp, const char *name) const
Serialize an object into a new section.
Definition serialize.cc:74
void unserializeSection(CheckpointIn &cp, const char *name)
Unserialize an a child object.
Definition serialize.cc:81
#define warn(...)
Definition logging.hh:256
#define warn_once(...)
Definition logging.hh:260
#define inform(...)
Definition logging.hh:257
Bitfield< 28 > v
Definition misc_types.hh:54
Bitfield< 7 > i
Definition misc_types.hh:67
Bitfield< 9 > d
Definition misc_types.hh:64
Bitfield< 20 > sr
Bitfield< 63 > val
Definition misc.hh:804
Bitfield< 2 > em
Definition misc.hh:617
Bitfield< 5 > lock
Definition types.hh:82
Copyright (c) 2024 Arm Limited All rights reserved.
Definition binary32.hh:36
Tick curTick()
The universal simulation clock.
Definition cur_tick.hh:46
std::ostream CheckpointOut
Definition serialize.hh:66
uint64_t Tick
Tick count type.
Definition types.hh:58
void exitSimLoop(const std::string &message, int exit_code, Tick when, Tick repeat, bool serialize)
Schedule an event to exit the simulation loop (returning to Python) at the end of the current cycle (...
Definition sim_events.cc:88
std::string csprintf(const char *format, const Args &...args)
Definition cprintf.hh:161
EventQueue * curEventQueue()
Definition eventq.hh:91
std::shared_ptr< EthPacketData > EthPacketPtr
Definition etherpkt.hh:90
output header
Definition nop.cc:36
#define UNSERIALIZE_SCALAR(scalar)
Definition serialize.hh:575
#define SERIALIZE_SCALAR(scalar)
Definition serialize.hh:568
MsgType msgType
The msg type field is valid for all header packets.
Received packet descriptor.
void serialize(CheckpointOut &cp) const override
Serialize an object.
void unserialize(CheckpointIn &cp) override
Unserialize an object.

Generated on Mon Jan 13 2025 04:28:34 for gem5 by doxygen 1.9.8