gem5 v24.0.0.0
Loading...
Searching...
No Matches
dist_iface.cc
Go to the documentation of this file.
1/*
2 * Copyright (c) 2015-2016 ARM Limited
3 * All rights reserved
4 *
5 * The license below extends only to copyright in the software and shall
6 * not be construed as granting a license to any other intellectual
7 * property including but not limited to intellectual property relating
8 * to a hardware implementation of the functionality of the software
9 * licensed hereunder. You may use the software subject to the license
10 * terms below provided that you ensure that this notice is replicated
11 * unmodified and in its entirety in all distributions of the software,
12 * modified or unmodified, in source code or in binary form.
13 *
14 * Redistribution and use in source and binary forms, with or without
15 * modification, are permitted provided that the following conditions are
16 * met: redistributions of source code must retain the above copyright
17 * notice, this list of conditions and the following disclaimer;
18 * redistributions in binary form must reproduce the above copyright
19 * notice, this list of conditions and the following disclaimer in the
20 * documentation and/or other materials provided with the distribution;
21 * neither the name of the copyright holders nor the names of its
22 * contributors may be used to endorse or promote products derived from
23 * this software without specific prior written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 */
37
38/* @file
39 * The interface class for dist-gem5 simulations.
40 */
41
42#include "dev/net/dist_iface.hh"
43
44#include <queue>
45#include <thread>
46
47#include "base/random.hh"
48#include "base/trace.hh"
49#include "cpu/thread_context.hh"
50#include "debug/DistEthernet.hh"
51#include "debug/DistEthernetPkt.hh"
52#include "dev/net/etherpkt.hh"
53#include "sim/cur_tick.hh"
54#include "sim/sim_exit.hh"
55#include "sim/sim_object.hh"
56#include "sim/system.hh"
57
58namespace gem5
59{
60
61DistIface::Sync *DistIface::sync = nullptr;
62System *DistIface::sys = nullptr;
63DistIface::SyncEvent *DistIface::syncEvent = nullptr;
64unsigned DistIface::distIfaceNum = 0;
65unsigned DistIface::recvThreadsNum = 0;
66DistIface *DistIface::primary = nullptr;
67bool DistIface::isSwitch = false;
68
69void
70DistIface::Sync::init(Tick start_tick, Tick repeat_tick)
71{
72 if (start_tick < nextAt) {
73 nextAt = start_tick;
74 inform("Next dist synchronisation tick is changed to %lu.\n", nextAt);
75 }
76
77 if (repeat_tick == 0)
78 panic("Dist synchronisation interval must be greater than zero");
79
80 if (repeat_tick < nextRepeat) {
81 nextRepeat = repeat_tick;
82 inform("Dist synchronisation interval is changed to %lu.\n",
84 }
85}
86
87void
89{
90 std::unique_lock<std::mutex> sync_lock(lock);
91 waitNum = 0;
92 isAbort = true;
93 sync_lock.unlock();
94 cv.notify_one();
95}
96
98{
99 numNodes = num_nodes;
100 waitNum = num_nodes;
101 numExitReq = 0;
102 numCkptReq = 0;
103 numStopSyncReq = 0;
104 doExit = false;
105 doCkpt = false;
106 doStopSync = false;
107 nextAt = std::numeric_limits<Tick>::max();
108 nextRepeat = std::numeric_limits<Tick>::max();
109 isAbort = false;
110}
111
113{
114 waitNum = 0;
115 needExit = ReqType::none;
116 needCkpt = ReqType::none;
117 needStopSync = ReqType::none;
118 doExit = false;
119 doCkpt = false;
120 doStopSync = false;
121 nextAt = std::numeric_limits<Tick>::max();
122 nextRepeat = std::numeric_limits<Tick>::max();
123 isAbort = false;
124}
125
126bool
128{
129 std::unique_lock<std::mutex> sync_lock(lock);
131
132 assert(waitNum == 0);
133 assert(!isAbort);
135 // initiate the global synchronisation
136 header.msgType = MsgType::cmdSyncReq;
137 header.sendTick = curTick();
138 header.syncRepeat = nextRepeat;
139 header.needCkpt = needCkpt;
140 header.needStopSync = needStopSync;
141 if (needCkpt != ReqType::none)
142 needCkpt = ReqType::pending;
143 header.needExit = needExit;
144 if (needExit != ReqType::none)
145 needExit = ReqType::pending;
146 if (needStopSync != ReqType::none)
147 needStopSync = ReqType::pending;
149 // now wait until all receiver threads complete the synchronisation
150 auto lf = [this]{ return waitNum == 0; };
151 cv.wait(sync_lock, lf);
152 // global synchronisation is done.
153 assert(isAbort || !same_tick || (nextAt == curTick()));
154 return !isAbort;
155}
156
157
158bool
160{
161 std::unique_lock<std::mutex> sync_lock(lock);
163 // Wait for the sync requests from the nodes
164 if (waitNum > 0) {
165 auto lf = [this]{ return waitNum == 0; };
166 cv.wait(sync_lock, lf);
167 }
168 assert(waitNum == 0);
169 if (isAbort) // sync aborted
170 return false;
171 assert(!same_tick || (nextAt == curTick()));
172 waitNum = numNodes;
173 // Complete the global synchronisation
174 header.msgType = MsgType::cmdSyncAck;
175 header.sendTick = nextAt;
176 header.syncRepeat = nextRepeat;
177 if (doCkpt || numCkptReq == numNodes) {
178 doCkpt = true;
179 header.needCkpt = ReqType::immediate;
180 numCkptReq = 0;
181 } else {
182 header.needCkpt = ReqType::none;
183 }
184 if (doExit || numExitReq == numNodes) {
185 doExit = true;
186 header.needExit = ReqType::immediate;
187 } else {
188 header.needExit = ReqType::none;
189 }
190 if (doStopSync || numStopSyncReq == numNodes) {
191 doStopSync = true;
192 numStopSyncReq = 0;
193 header.needStopSync = ReqType::immediate;
194 } else {
195 header.needStopSync = ReqType::none;
196 }
198 return true;
199}
200
201bool
203 Tick sync_repeat,
204 ReqType need_ckpt,
205 ReqType need_exit,
206 ReqType need_stop_sync)
207{
208 std::unique_lock<std::mutex> sync_lock(lock);
209 if (isAbort) // sync aborted
210 return false;
211 assert(waitNum > 0);
212
213 if (send_tick > nextAt)
214 nextAt = send_tick;
215 if (nextRepeat > sync_repeat)
216 nextRepeat = sync_repeat;
217
218 if (need_ckpt == ReqType::collective)
219 numCkptReq++;
220 else if (need_ckpt == ReqType::immediate)
221 doCkpt = true;
222 if (need_exit == ReqType::collective)
223 numExitReq++;
224 else if (need_exit == ReqType::immediate)
225 doExit = true;
226 if (need_stop_sync == ReqType::collective)
227 numStopSyncReq++;
228 else if (need_stop_sync == ReqType::immediate)
229 doStopSync = true;
230
231 waitNum--;
232 // Notify the simulation thread if the on-going sync is complete
233 if (waitNum == 0) {
234 sync_lock.unlock();
235 cv.notify_one();
236 }
237 // The receive thread must keep alive in the switch until the node
238 // closes the connection. Thus, we always return true here.
239 return true;
240}
241
242bool
244 Tick next_repeat,
245 ReqType do_ckpt,
246 ReqType do_exit,
247 ReqType do_stop_sync)
248{
249 std::unique_lock<std::mutex> sync_lock(lock);
250 if (isAbort) // sync aborted
251 return false;
252 assert(waitNum > 0);
253
254 nextAt = max_send_tick;
255 nextRepeat = next_repeat;
256 doCkpt = (do_ckpt != ReqType::none);
257 doExit = (do_exit != ReqType::none);
258 doStopSync = (do_stop_sync != ReqType::none);
259
260 waitNum--;
261 // Notify the simulation thread if the on-going sync is complete
262 if (waitNum == 0) {
263 sync_lock.unlock();
264 cv.notify_one();
265 }
266 // The receive thread must finish when simulation is about to exit
267 return !doExit;
268}
269
270void
272{
273 std::lock_guard<std::mutex> sync_lock(lock);
274 assert(req != ReqType::none);
275 if (needCkpt != ReqType::none)
276 warn("Ckpt requested multiple times (req:%d)\n", static_cast<int>(req));
277 if (needCkpt == ReqType::none || req == ReqType::immediate)
278 needCkpt = req;
279}
280
281void
283{
284 std::lock_guard<std::mutex> sync_lock(lock);
285 assert(req != ReqType::none);
286 if (needExit != ReqType::none)
287 warn("Exit requested multiple times (req:%d)\n", static_cast<int>(req));
288 if (needExit == ReqType::none || req == ReqType::immediate)
289 needExit = req;
290}
291
292void
294{
295 if (doCkpt) {
296 // The first DistIface object called this right before writing the
297 // checkpoint. We need to drain the underlying physical network here.
298 // Note that other gem5 peers may enter this barrier at different
299 // ticks due to draining.
300 run(false);
301 // Only the "first" DistIface object has to perform the sync
302 doCkpt = false;
303 }
304}
305
306void
308{
309 int need_exit = static_cast<int>(needExit);
310 SERIALIZE_SCALAR(need_exit);
311}
312
313void
315{
316 int need_exit;
317 UNSERIALIZE_SCALAR(need_exit);
318 needExit = static_cast<ReqType>(need_exit);
319}
320
321void
326
327void
332
333void
335{
336 // Note that this may be called either from startup() or drainResume()
337
338 // At this point, all DistIface objects has already called Sync::init() so
339 // we have a local minimum of the start tick and repeat for the periodic
340 // sync.
341 repeat = DistIface::sync->nextRepeat;
342 // Do a global barrier to agree on a common repeat value (the smallest
343 // one from all participating nodes.
344 if (!DistIface::sync->run(false))
345 panic("DistIface::SyncEvent::start() aborted\n");
346
347 assert(!DistIface::sync->doCkpt);
348 assert(!DistIface::sync->doExit);
349 assert(!DistIface::sync->doStopSync);
350 assert(DistIface::sync->nextAt >= curTick());
351 assert(DistIface::sync->nextRepeat <= repeat);
352
353 if (curTick() == 0)
354 assert(!scheduled());
355
356 // Use the maximum of the current tick for all participating nodes or a
357 // user provided starting tick.
358 if (scheduled())
359 reschedule(DistIface::sync->nextAt);
360 else
361 schedule(DistIface::sync->nextAt);
362
363 inform("Dist sync scheduled at %lu and repeats %lu\n", when(),
364 DistIface::sync->nextRepeat);
365}
366
367void
369{
370 // We may not start a global periodic sync while draining before taking a
371 // checkpoint. This is due to the possibility that peer gem5 processes
372 // may not hit the same periodic sync before they complete draining and
373 // that would make this periodic sync clash with sync called from
374 // DistIface::serialize() by other gem5 processes.
375 // We would need a 'distributed drain' solution to eliminate this
376 // restriction.
377 // Note that if draining was not triggered by checkpointing then we are
378 // fine since no extra global sync will happen (i.e. all peer gem5 will
379 // hit this periodic sync eventually).
380 panic_if(_draining && DistIface::sync->doCkpt,
381 "Distributed sync is hit while draining");
382 /*
383 * Note that this is a global event so this process method will be called
384 * by only exactly one thread.
385 */
386 /*
387 * We hold the eventq lock at this point but the receiver thread may
388 * need the lock to schedule new recv events while waiting for the
389 * dist sync to complete.
390 * Note that the other simulation threads also release their eventq
391 * locks while waiting for us due to the global event semantics.
392 */
393 {
395 // we do a global sync here that is supposed to happen at the same
396 // tick in all gem5 peers
397 if (!DistIface::sync->run(true))
398 return; // global sync aborted
399 // global sync completed
400 }
401 if (DistIface::sync->doCkpt)
402 exitSimLoop("checkpoint");
403 if (DistIface::sync->doExit) {
404 exitSimLoop("exit request from gem5 peers");
405 return;
406 }
407 if (DistIface::sync->doStopSync) {
409 inform("synchronization disabled at %lu\n", curTick());
410
411 // The switch node needs to wait for the next sync immediately.
413 start();
414 } else {
415 // Wake up thread contexts on non-switch nodes.
416 for (auto *tc: primary->sys->threads) {
417 if (tc->status() == ThreadContext::Suspended)
418 tc->activate();
419 else
420 warn_once("Tried to wake up thread in dist-gem5, but it "
421 "was already awake!\n");
422 }
423 }
424 return;
425 }
426 // schedule the next periodic sync
427 repeat = DistIface::sync->nextRepeat;
428 schedule(curTick() + repeat);
429}
430
431void
433{
434 // This is called from the receiver thread when it starts running. The new
435 // receiver thread shares the event queue with the simulation thread
436 // (associated with the simulated Ethernet link).
437 curEventQueue(eventManager->eventQueue());
438
439 recvDone = recv_done;
440 linkDelay = link_delay;
441}
442
443Tick
445 Tick send_delay,
446 Tick prev_recv_tick)
447{
448 Tick recv_tick = send_tick + send_delay + linkDelay;
449 // sanity check (we need atleast a send delay long window)
450 assert(recv_tick >= prev_recv_tick + send_delay);
451 panic_if(prev_recv_tick + send_delay > recv_tick,
452 "Receive window is smaller than send delay");
453 panic_if(recv_tick <= curTick(),
454 "Simulators out of sync - missed packet receive by %llu ticks"
455 "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu "
456 "linkDelay: %lu )",
457 curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
458 linkDelay);
459
460 return recv_tick;
461}
462
463void
465{
466 // Schedule pending packets asap in case link speed/delay changed when
467 // restoring from the checkpoint.
468 // This may be done during unserialize except that curTick() is unknown
469 // so we call this during drainResume().
470 // If we are not restoring from a checkppint then link latency could not
471 // change so we just return.
472 if (!ckptRestore)
473 return;
474
476 while (!descQueue.empty()) {
477 Desc d = descQueue.front();
478 descQueue.pop();
479 d.sendTick = curTick();
480 d.sendDelay = d.packet->simLength; // assume 1 tick/byte max link speed
481 v.push_back(d);
482 }
483
484 for (auto &d : v)
485 descQueue.push(d);
486
487 if (recvDone->scheduled()) {
488 assert(!descQueue.empty());
489 eventManager->reschedule(recvDone, curTick());
490 } else {
491 assert(descQueue.empty() && v.empty());
492 }
493 ckptRestore = false;
494}
495
496void
498 Tick send_tick,
499 Tick send_delay)
500{
501 // Note : this is called from the receiver thread
502 curEventQueue()->lock();
503 Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick);
504
505 DPRINTF(DistEthernetPkt, "DistIface::recvScheduler::pushPacket "
506 "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
507 send_tick, send_delay, linkDelay, recv_tick);
508 // Every packet must be sent and arrive in the same quantum
509 assert(send_tick > primary->syncEvent->when() -
511 // No packet may be scheduled for receive in the arrival quantum
512 assert(send_tick + send_delay + linkDelay > primary->syncEvent->when());
513
514 // Now we are about to schedule a recvDone event for the new data packet.
515 // We use the same recvDone object for all incoming data packets. Packet
516 // descriptors are saved in the ordered queue. The currently scheduled
517 // packet is always on the top of the queue.
518 // NOTE: we use the event queue lock to protect the receive desc queue,
519 // too, which is accessed both by the receiver thread and the simulation
520 // thread.
521 descQueue.emplace(new_packet, send_tick, send_delay);
522 if (descQueue.size() == 1) {
523 assert(!recvDone->scheduled());
524 eventManager->schedule(recvDone, recv_tick);
525 } else {
526 assert(recvDone->scheduled());
527 panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick,
528 "Out of order packet received (recv_tick: %lu top(): %lu\n",
529 recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay);
530 }
532}
533
536{
537 // Note : this is called from the simulation thread when a receive done
538 // event is being processed for the link. We assume that the thread holds
539 // the event queue queue lock when this is called!
540 EthPacketPtr next_packet = descQueue.front().packet;
541 descQueue.pop();
542
543 if (descQueue.size() > 0) {
544 Tick recv_tick = calcReceiveTick(descQueue.front().sendTick,
545 descQueue.front().sendDelay,
546 curTick());
547 eventManager->schedule(recvDone, recv_tick);
548 }
549 prevRecvTick = curTick();
550 return next_packet;
551}
552
553void
555{
556 SERIALIZE_SCALAR(sendTick);
557 SERIALIZE_SCALAR(sendDelay);
558 packet->serialize("rxPacket", cp);
559}
560
561void
563{
564 UNSERIALIZE_SCALAR(sendTick);
565 UNSERIALIZE_SCALAR(sendDelay);
566 packet = std::make_shared<EthPacketData>();
567 packet->unserialize("rxPacket", cp);
568}
569
570void
572{
573 SERIALIZE_SCALAR(prevRecvTick);
574 // serialize the receive desc queue
575 std::queue<Desc> tmp_queue(descQueue);
576 unsigned n_desc_queue = descQueue.size();
577 assert(tmp_queue.size() == descQueue.size());
578 SERIALIZE_SCALAR(n_desc_queue);
579 for (int i = 0; i < n_desc_queue; i++) {
580 tmp_queue.front().serializeSection(cp, csprintf("rxDesc_%d", i));
581 tmp_queue.pop();
582 }
583 assert(tmp_queue.empty());
584}
585
586void
588{
589 assert(descQueue.size() == 0);
590 assert(!recvDone->scheduled());
591 assert(!ckptRestore);
592
593 UNSERIALIZE_SCALAR(prevRecvTick);
594 // unserialize the receive desc queue
595 unsigned n_desc_queue;
596 UNSERIALIZE_SCALAR(n_desc_queue);
597 for (int i = 0; i < n_desc_queue; i++) {
598 Desc recv_desc;
599 recv_desc.unserializeSection(cp, csprintf("rxDesc_%d", i));
600 descQueue.push(recv_desc);
601 }
602 ckptRestore = true;
603}
604
605DistIface::DistIface(unsigned dist_rank,
606 unsigned dist_size,
607 Tick sync_start,
608 Tick sync_repeat,
610 bool use_pseudo_op,
611 bool is_switch, int num_nodes) :
612 syncStart(sync_start), syncRepeat(sync_repeat),
613 recvThread(nullptr), recvScheduler(em), syncStartOnPseudoOp(use_pseudo_op),
614 rank(dist_rank), size(dist_size)
615{
616 DPRINTF(DistEthernet, "DistIface() ctor rank:%d\n",dist_rank);
617 isPrimary = false;
618 if (primary == nullptr) {
619 assert(sync == nullptr);
620 assert(syncEvent == nullptr);
621 isSwitch = is_switch;
622 if (is_switch)
623 sync = new SyncSwitch(num_nodes);
624 else
625 sync = new SyncNode();
626 syncEvent = new SyncEvent();
627 primary = this;
628 isPrimary = true;
629 }
631 distIfaceNum++;
632}
633
635{
636 assert(recvThread);
637 recvThread->join();
638 delete recvThread;
639 if (distIfaceNum-- == 0) {
640 assert(syncEvent);
641 delete syncEvent;
642 assert(sync);
643 delete sync;
644 }
645 if (this == primary)
646 primary = nullptr;
647}
648
649void
651{
653
654 // Prepare a dist header packet for the Ethernet packet we want to
655 // send out.
656 header.msgType = MsgType::dataDescriptor;
657 header.sendTick = curTick();
658 header.sendDelay = send_delay;
659
660 header.dataPacketLength = pkt->length;
661 header.simLength = pkt->simLength;
662
663 // Send out the packet and the meta info.
664 sendPacket(header, pkt);
665
666 DPRINTF(DistEthernetPkt,
667 "DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
668 pkt->length, send_delay);
669}
670
671void
672DistIface::recvThreadFunc(Event *recv_done, Tick link_delay)
673{
674 EthPacketPtr new_packet;
676
677 // Initialize receive scheduler parameters
678 recvScheduler.init(recv_done, link_delay);
679
680 // Main loop to wait for and process any incoming message.
681 for (;;) {
682 // recvHeader() blocks until the next dist header packet comes in.
683 if (!recvHeader(header)) {
684 // We lost connection to the peer gem5 processes most likely
685 // because one of them called m5 exit. So we stop here.
686 // Grab the eventq lock to stop the simulation thread
687 curEventQueue()->lock();
688 exitSimLoop("connection to gem5 peer got closed");
690 // The simulation thread may be blocked in processing an on-going
691 // global synchronisation. Abort the sync to give the simulation
692 // thread a chance to make progress and process the exit event.
693 sync->abort();
694 // Finish receiver thread
695 break;
696 }
697
698 // We got a valid dist header packet, let's process it
699 if (header.msgType == MsgType::dataDescriptor) {
700 recvPacket(header, new_packet);
701 recvScheduler.pushPacket(new_packet,
702 header.sendTick,
703 header.sendDelay);
704 } else {
705 // everything else must be synchronisation related command
706 if (!sync->progress(header.sendTick,
707 header.syncRepeat,
708 header.needCkpt,
709 header.needExit,
710 header.needStopSync))
711 // Finish receiver thread if simulation is about to exit
712 break;
713 }
714 }
715}
716
717void
718DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay)
719{
720 assert(recvThread == nullptr);
721
722 recvThread = new std::thread(&DistIface::recvThreadFunc,
723 this,
724 const_cast<Event *>(recv_done),
725 link_delay);
727}
728
731{
732 DPRINTF(DistEthernet,"DistIFace::drain() called\n");
733 // This can be called multiple times in the same drain cycle.
734 if (this == primary)
735 syncEvent->draining(true);
736 return DrainState::Drained;
737}
738
739void
741 DPRINTF(DistEthernet,"DistIFace::drainResume() called\n");
742 if (this == primary)
743 syncEvent->draining(false);
745}
746
747void
749{
750 // Drain the dist interface before the checkpoint is taken. We cannot call
751 // this as part of the normal drain cycle because this dist sync has to be
752 // called exactly once after the system is fully drained.
754
755 unsigned rank_orig = rank, dist_iface_id_orig = distIfaceId;
756
757 SERIALIZE_SCALAR(rank_orig);
758 SERIALIZE_SCALAR(dist_iface_id_orig);
759
760 recvScheduler.serializeSection(cp, "recvScheduler");
761 if (this == primary) {
762 sync->serializeSection(cp, "Sync");
763 }
764}
765
766void
768{
769 unsigned rank_orig, dist_iface_id_orig;
770 UNSERIALIZE_SCALAR(rank_orig);
771 UNSERIALIZE_SCALAR(dist_iface_id_orig);
772
773 panic_if(rank != rank_orig, "Rank mismatch at resume (rank=%d, orig=%d)",
774 rank, rank_orig);
775 panic_if(distIfaceId != dist_iface_id_orig, "Dist iface ID mismatch "
776 "at resume (distIfaceId=%d, orig=%d)", distIfaceId,
777 dist_iface_id_orig);
778
779 recvScheduler.unserializeSection(cp, "recvScheduler");
780 if (this == primary) {
781 sync->unserializeSection(cp, "Sync");
782 }
783}
784
785void
786DistIface::init(const Event *done_event, Tick link_delay)
787{
788 // Init hook for the underlaying message transport to setup/finalize
789 // communication channels
791
792 // Spawn a new receiver thread that will process messages
793 // coming in from peer gem5 processes.
794 // The receive thread will also schedule a (receive) doneEvent
795 // for each incoming data packet.
796 spawnRecvThread(done_event, link_delay);
797
798
799 // Adjust the periodic sync start and interval. Different DistIface
800 // might have different requirements. The singleton sync object
801 // will select the minimum values for both params.
802 assert(sync != nullptr);
804
805 // Initialize the seed for random generator to avoid the same sequence
806 // in all gem5 peer processes
807 assert(primary != nullptr);
808 if (this == primary)
809 random_mt.init(5489 * (rank+1) + 257);
810}
811
812void
814{
815 DPRINTF(DistEthernet, "DistIface::startup() started\n");
816 // Schedule synchronization unless we are not a switch in pseudo_op mode.
817 if (this == primary && (!syncStartOnPseudoOp || isSwitch))
818 syncEvent->start();
819 DPRINTF(DistEthernet, "DistIface::startup() done\n");
820}
821
822bool
824{
825 bool ret = true;
826 DPRINTF(DistEthernet, "DistIface::readyToCkpt() called, delay:%lu "
827 "period:%lu\n", delay, period);
828 if (primary) {
829 if (delay == 0) {
830 inform("m5 checkpoint called with zero delay => triggering collaborative "
831 "checkpoint\n");
832 sync->requestCkpt(ReqType::collective);
833 } else {
834 inform("m5 checkpoint called with non-zero delay => triggering immediate "
835 "checkpoint (at the next sync)\n");
836 sync->requestCkpt(ReqType::immediate);
837 }
838 if (period != 0)
839 inform("Non-zero period for m5_ckpt is ignored in "
840 "distributed gem5 runs\n");
841 ret = false;
842 }
843 return ret;
844}
845
846void
848{
849 std::lock_guard<std::mutex> sync_lock(lock);
850 needStopSync = req;
851}
852
853void
855{
856 // Unforunate that we have to populate the system pointer member this way.
857 primary->sys = tc->getSystemPtr();
858
859 // The invariant for both syncing and "unsyncing" is that all threads will
860 // stop executing intructions until the desired sync state has been reached
861 // for all nodes. This is the easiest way to prevent deadlock (in the case
862 // of "unsyncing") and causality errors (in the case of syncing).
863 if (primary->syncEvent->scheduled()) {
864 inform("Request toggling syncronization off\n");
865 primary->sync->requestStopSync(ReqType::collective);
866
867 // At this point, we have no clue when everyone will reach the sync
868 // stop point. Suspend execution of all local thread contexts.
869 // Dist-gem5 will reactivate all thread contexts when everyone has
870 // reached the sync stop point.
871 for (auto *tc: primary->sys->threads) {
872 if (tc->status() == ThreadContext::Active)
873 tc->quiesce();
874 }
875 } else {
876 inform("Request toggling syncronization on\n");
878
879 // We need to suspend all CPUs until the sync point is reached by all
880 // nodes to prevent causality errors. We can also schedule CPU
881 // activation here, since we know exactly when the next sync will
882 // occur.
883 for (auto *tc: primary->sys->threads) {
884 if (tc->status() == ThreadContext::Active)
885 tc->quiesceTick(primary->syncEvent->when() + 1);
886 }
887 }
888}
889
890bool
892{
893 bool ret = true;
894 DPRINTF(DistEthernet, "DistIface::readyToExit() called, delay:%lu\n",
895 delay);
896 if (primary) {
897 // To successfully coordinate an exit, all nodes must be synchronising
898 if (!primary->syncEvent->scheduled())
900
901 if (delay == 0) {
902 inform("m5 exit called with zero delay => triggering collaborative "
903 "exit\n");
904 sync->requestExit(ReqType::collective);
905 } else {
906 inform("m5 exit called with non-zero delay => triggering immediate "
907 "exit (at the next sync)\n");
908 sync->requestExit(ReqType::immediate);
909 }
910 ret = false;
911 }
912 return ret;
913}
914
915uint64_t
917{
918 uint64_t val;
919 if (primary) {
920 val = primary->rank;
921 } else {
922 warn("Dist-rank parameter is queried in single gem5 simulation.");
923 val = 0;
924 }
925 return val;
926}
927
928uint64_t
930{
931 uint64_t val;
932 if (primary) {
933 val = primary->size;
934 } else {
935 warn("Dist-size parameter is queried in single gem5 simulation.");
936 val = 1;
937 }
938 return val;
939}
940
941} // namespace gem5
#define DPRINTF(x,...)
Definition trace.hh:210
void unserialize(CheckpointIn &cp) override
Unserialize an object.
void serialize(CheckpointOut &cp) const override
Serialize an object.
void init(Event *recv_done, Tick link_delay)
Initialize network link parameters.
void resumeRecvTicks()
Adjust receive ticks for pending packets when restoring from a checkpoint.
EthPacketPtr popPacket()
Fetch the next packet that is to be received by the simulated network link.
void pushPacket(EthPacketPtr new_packet, Tick send_tick, Tick send_delay)
Push a newly arrived packet into the desc queue.
Tick calcReceiveTick(Tick send_tick, Tick send_delay, Tick prev_recv_tick)
Calculate the tick to schedule the next receive done event.
The global event to schedule periodic dist sync.
void start()
Schedule the first periodic sync event.
void process() override
This is a global event so process() will only be called by exactly one simulation thread.
void requestExit(ReqType req) override
void serialize(CheckpointOut &cp) const override
Serialize an object.
ReqType needStopSync
Sync stop requested.
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
void requestCkpt(ReqType req) override
bool run(bool same_tick) override
Core method to perform a full dist sync.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
void requestStopSync(ReqType req) override
bool run(bool same_tick) override
Core method to perform a full dist sync.
void serialize(CheckpointOut &cp) const override
Serialize an object.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
Tick nextRepeat
The repeat value for the next periodic sync.
bool doStopSync
Flag is set if sync is to stop upon sync completion.
void init(Tick start, Tick repeat)
Initialize periodic sync params.
Definition dist_iface.cc:70
Tick nextAt
Tick for the next periodic sync (if the event is not scheduled yet)
virtual void requestStopSync(ReqType req)=0
virtual void requestExit(ReqType req)=0
virtual void requestCkpt(ReqType req)=0
std::mutex lock
The lock to protect access to the Sync object.
virtual bool progress(Tick send_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync)=0
Callback when the receiver thread gets a sync ack message.
void abort()
Abort processing an on-going sync event (in case of an error, e.g.
Definition dist_iface.cc:88
static uint64_t rankParam()
Getter for the dist rank param.
virtual void initTransport()=0
Init hook for the underlaying transport.
RecvScheduler recvScheduler
Meta information about data packets received.
static bool readyToExit(Tick delay)
Initiate the exit from the simulation.
bool syncStartOnPseudoOp
Use pseudoOp to start synchronization.
static void toggleSync(ThreadContext *tc)
Trigger the primary to start/stop synchronization.
static bool readyToCkpt(Tick delay, Tick period)
Initiate taking a checkpoint.
void init(const Event *e, Tick link_delay)
unsigned rank
The rank of this process among the gem5 peers.
unsigned distIfaceId
Unique id for the dist link.
static DistIface * primary
The very first DistIface object created becomes the primary interface.
static uint64_t sizeParam()
Getter for the dist size param.
DrainState drain() override
Draining is the process of clearing out the states of SimObjects.These are the SimObjects that are pa...
std::thread * recvThread
Receiver thread pointer.
static SyncEvent * syncEvent
The singleton SyncEvent object to schedule periodic dist sync.
void unserialize(CheckpointIn &cp) override
Unserialize an object.
static System * sys
System pointer used to wakeup sleeping threads when stopping sync.
void recvThreadFunc(Event *recv_done, Tick link_delay)
The function executed by a receiver thread.
virtual void sendCmd(const Header &header)=0
Send out a control command to the remote end.
virtual bool recvHeader(Header &header)=0
Receive a header (i.e.
static Sync * sync
The singleton Sync object to perform dist synchronisation.
virtual void sendPacket(const Header &header, const EthPacketPtr &packet)=0
Send out a data packet to the remote end.
static unsigned distIfaceNum
Number of DistIface objects (i.e.
void drainResume() override
Resume execution after a successful drain.
virtual ~DistIface()
void spawnRecvThread(const Event *recv_done, Tick link_delay)
spawn the receiver thread.
static bool isSwitch
Is this node a switch?
void serialize(CheckpointOut &cp) const override
Serialize an object.
virtual void recvPacket(const Header &header, EthPacketPtr &packet)=0
Receive a packet from the remote end.
Tick syncRepeat
Frequency of dist sync events in ticks.
Tick syncStart
Tick to schedule the first dist sync event.
static unsigned recvThreadsNum
Number of receiver threads (in this gem5 process)
DistIface(unsigned dist_rank, unsigned dist_size, Tick sync_start, Tick sync_repeat, EventManager *em, bool use_pseudo_op, bool is_switch, int num_nodes)
ctor
void packetOut(EthPacketPtr pkt, Tick send_delay)
Send out an Ethernet packet.
unsigned size
The number of gem5 processes comprising this dist simulation.
void lock()
Provide an interface for locking/unlocking the event queue.
Definition eventq.hh:947
void init(uint32_t s)
Definition random.cc:67
Threads threads
Definition system.hh:310
ThreadContext is the external interface to all thread state for anything outside of the CPU.
virtual System * getSystemPtr()=0
void quiesceTick(Tick resume)
Quiesce, suspend, and schedule activate at resume.
void quiesce()
Quiesce thread context.
@ Suspended
Temporarily inactive.
virtual Status status() const =0
STL vector class.
Definition stl.hh:37
Random random_mt
Definition random.cc:99
DrainState
Object drain/handover states.
Definition drain.hh:75
@ Drained
Buffers drained, ready for serialization/handover.
#define panic(...)
This implements a cprintf based panic() function.
Definition logging.hh:188
#define panic_if(cond,...)
Conditional panic macro that checks the supplied condition and only panics if the condition is true a...
Definition logging.hh:214
void serializeSection(CheckpointOut &cp, const char *name) const
Serialize an object into a new section.
Definition serialize.cc:74
void unserializeSection(CheckpointIn &cp, const char *name)
Unserialize an a child object.
Definition serialize.cc:81
#define warn(...)
Definition logging.hh:256
#define warn_once(...)
Definition logging.hh:260
#define inform(...)
Definition logging.hh:257
Bitfield< 28 > v
Definition misc_types.hh:54
Bitfield< 7 > i
Definition misc_types.hh:67
Bitfield< 9 > d
Definition misc_types.hh:64
Bitfield< 20 > sr
Bitfield< 63 > val
Definition misc.hh:804
Bitfield< 2 > em
Definition misc.hh:617
Bitfield< 5 > lock
Definition types.hh:82
Copyright (c) 2024 - Pranith Kumar Copyright (c) 2020 Inria All rights reserved.
Definition binary32.hh:36
Tick curTick()
The universal simulation clock.
Definition cur_tick.hh:46
std::ostream CheckpointOut
Definition serialize.hh:66
uint64_t Tick
Tick count type.
Definition types.hh:58
void exitSimLoop(const std::string &message, int exit_code, Tick when, Tick repeat, bool serialize)
Schedule an event to exit the simulation loop (returning to Python) at the end of the current cycle (...
Definition sim_events.cc:88
std::string csprintf(const char *format, const Args &...args)
Definition cprintf.hh:161
EventQueue * curEventQueue()
Definition eventq.hh:91
std::shared_ptr< EthPacketData > EthPacketPtr
Definition etherpkt.hh:90
output header
Definition nop.cc:36
#define UNSERIALIZE_SCALAR(scalar)
Definition serialize.hh:575
#define SERIALIZE_SCALAR(scalar)
Definition serialize.hh:568
MsgType msgType
The msg type field is valid for all header packets.
Received packet descriptor.
void serialize(CheckpointOut &cp) const override
Serialize an object.
void unserialize(CheckpointIn &cp) override
Unserialize an object.

Generated on Tue Jun 18 2024 16:24:03 for gem5 by doxygen 1.11.0