gem5  v21.0.1.0
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
dist_iface.cc
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015-2016 ARM Limited
3  * All rights reserved
4  *
5  * The license below extends only to copyright in the software and shall
6  * not be construed as granting a license to any other intellectual
7  * property including but not limited to intellectual property relating
8  * to a hardware implementation of the functionality of the software
9  * licensed hereunder. You may use the software subject to the license
10  * terms below provided that you ensure that this notice is replicated
11  * unmodified and in its entirety in all distributions of the software,
12  * modified or unmodified, in source code or in binary form.
13  *
14  * Redistribution and use in source and binary forms, with or without
15  * modification, are permitted provided that the following conditions are
16  * met: redistributions of source code must retain the above copyright
17  * notice, this list of conditions and the following disclaimer;
18  * redistributions in binary form must reproduce the above copyright
19  * notice, this list of conditions and the following disclaimer in the
20  * documentation and/or other materials provided with the distribution;
21  * neither the name of the copyright holders nor the names of its
22  * contributors may be used to endorse or promote products derived from
23  * this software without specific prior written permission.
24  *
25  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36  */
37 
38 /* @file
39  * The interface class for dist-gem5 simulations.
40  */
41 
42 #include "dev/net/dist_iface.hh"
43 
44 #include <queue>
45 #include <thread>
46 
47 #include "base/random.hh"
48 #include "base/trace.hh"
49 #include "cpu/thread_context.hh"
50 #include "debug/DistEthernet.hh"
51 #include "debug/DistEthernetPkt.hh"
52 #include "dev/net/etherpkt.hh"
53 #include "sim/sim_exit.hh"
54 #include "sim/sim_object.hh"
55 #include "sim/system.hh"
56 
58 System *DistIface::sys = nullptr;
60 unsigned DistIface::distIfaceNum = 0;
61 unsigned DistIface::recvThreadsNum = 0;
62 DistIface *DistIface::primary = nullptr;
63 bool DistIface::isSwitch = false;
64 
65 void
66 DistIface::Sync::init(Tick start_tick, Tick repeat_tick)
67 {
68  if (start_tick < nextAt) {
69  nextAt = start_tick;
70  inform("Next dist synchronisation tick is changed to %lu.\n", nextAt);
71  }
72 
73  if (repeat_tick == 0)
74  panic("Dist synchronisation interval must be greater than zero");
75 
76  if (repeat_tick < nextRepeat) {
77  nextRepeat = repeat_tick;
78  inform("Dist synchronisation interval is changed to %lu.\n",
79  nextRepeat);
80  }
81 }
82 
83 void
85 {
86  std::unique_lock<std::mutex> sync_lock(lock);
87  waitNum = 0;
88  isAbort = true;
89  sync_lock.unlock();
90  cv.notify_one();
91 }
92 
94 {
95  numNodes = num_nodes;
96  waitNum = num_nodes;
97  numExitReq = 0;
98  numCkptReq = 0;
99  numStopSyncReq = 0;
100  doExit = false;
101  doCkpt = false;
102  doStopSync = false;
103  nextAt = std::numeric_limits<Tick>::max();
104  nextRepeat = std::numeric_limits<Tick>::max();
105  isAbort = false;
106 }
107 
109 {
110  waitNum = 0;
111  needExit = ReqType::none;
112  needCkpt = ReqType::none;
113  needStopSync = ReqType::none;
114  doExit = false;
115  doCkpt = false;
116  doStopSync = false;
117  nextAt = std::numeric_limits<Tick>::max();
118  nextRepeat = std::numeric_limits<Tick>::max();
119  isAbort = false;
120 }
121 
122 bool
124 {
125  std::unique_lock<std::mutex> sync_lock(lock);
126  Header header;
127 
128  assert(waitNum == 0);
129  assert(!isAbort);
130  waitNum = DistIface::recvThreadsNum;
131  // initiate the global synchronisation
132  header.msgType = MsgType::cmdSyncReq;
133  header.sendTick = curTick();
134  header.syncRepeat = nextRepeat;
135  header.needCkpt = needCkpt;
136  header.needStopSync = needStopSync;
137  if (needCkpt != ReqType::none)
138  needCkpt = ReqType::pending;
139  header.needExit = needExit;
140  if (needExit != ReqType::none)
141  needExit = ReqType::pending;
142  if (needStopSync != ReqType::none)
143  needStopSync = ReqType::pending;
145  // now wait until all receiver threads complete the synchronisation
146  auto lf = [this]{ return waitNum == 0; };
147  cv.wait(sync_lock, lf);
148  // global synchronisation is done.
149  assert(isAbort || !same_tick || (nextAt == curTick()));
150  return !isAbort;
151 }
152 
153 
154 bool
156 {
157  std::unique_lock<std::mutex> sync_lock(lock);
158  Header header;
159  // Wait for the sync requests from the nodes
160  if (waitNum > 0) {
161  auto lf = [this]{ return waitNum == 0; };
162  cv.wait(sync_lock, lf);
163  }
164  assert(waitNum == 0);
165  if (isAbort) // sync aborted
166  return false;
167  assert(!same_tick || (nextAt == curTick()));
168  waitNum = numNodes;
169  // Complete the global synchronisation
170  header.msgType = MsgType::cmdSyncAck;
171  header.sendTick = nextAt;
172  header.syncRepeat = nextRepeat;
173  if (doCkpt || numCkptReq == numNodes) {
174  doCkpt = true;
175  header.needCkpt = ReqType::immediate;
176  numCkptReq = 0;
177  } else {
178  header.needCkpt = ReqType::none;
179  }
180  if (doExit || numExitReq == numNodes) {
181  doExit = true;
182  header.needExit = ReqType::immediate;
183  } else {
184  header.needExit = ReqType::none;
185  }
186  if (doStopSync || numStopSyncReq == numNodes) {
187  doStopSync = true;
188  numStopSyncReq = 0;
189  header.needStopSync = ReqType::immediate;
190  } else {
191  header.needStopSync = ReqType::none;
192  }
194  return true;
195 }
196 
197 bool
199  Tick sync_repeat,
200  ReqType need_ckpt,
201  ReqType need_exit,
202  ReqType need_stop_sync)
203 {
204  std::unique_lock<std::mutex> sync_lock(lock);
205  if (isAbort) // sync aborted
206  return false;
207  assert(waitNum > 0);
208 
209  if (send_tick > nextAt)
210  nextAt = send_tick;
211  if (nextRepeat > sync_repeat)
212  nextRepeat = sync_repeat;
213 
214  if (need_ckpt == ReqType::collective)
215  numCkptReq++;
216  else if (need_ckpt == ReqType::immediate)
217  doCkpt = true;
218  if (need_exit == ReqType::collective)
219  numExitReq++;
220  else if (need_exit == ReqType::immediate)
221  doExit = true;
222  if (need_stop_sync == ReqType::collective)
223  numStopSyncReq++;
224  else if (need_stop_sync == ReqType::immediate)
225  doStopSync = true;
226 
227  waitNum--;
228  // Notify the simulation thread if the on-going sync is complete
229  if (waitNum == 0) {
230  sync_lock.unlock();
231  cv.notify_one();
232  }
233  // The receive thread must keep alive in the switch until the node
234  // closes the connection. Thus, we always return true here.
235  return true;
236 }
237 
238 bool
240  Tick next_repeat,
241  ReqType do_ckpt,
242  ReqType do_exit,
243  ReqType do_stop_sync)
244 {
245  std::unique_lock<std::mutex> sync_lock(lock);
246  if (isAbort) // sync aborted
247  return false;
248  assert(waitNum > 0);
249 
250  nextAt = max_send_tick;
251  nextRepeat = next_repeat;
252  doCkpt = (do_ckpt != ReqType::none);
253  doExit = (do_exit != ReqType::none);
254  doStopSync = (do_stop_sync != ReqType::none);
255 
256  waitNum--;
257  // Notify the simulation thread if the on-going sync is complete
258  if (waitNum == 0) {
259  sync_lock.unlock();
260  cv.notify_one();
261  }
262  // The receive thread must finish when simulation is about to exit
263  return !doExit;
264 }
265 
266 void
268 {
269  std::lock_guard<std::mutex> sync_lock(lock);
270  assert(req != ReqType::none);
271  if (needCkpt != ReqType::none)
272  warn("Ckpt requested multiple times (req:%d)\n", static_cast<int>(req));
273  if (needCkpt == ReqType::none || req == ReqType::immediate)
274  needCkpt = req;
275 }
276 
277 void
279 {
280  std::lock_guard<std::mutex> sync_lock(lock);
281  assert(req != ReqType::none);
282  if (needExit != ReqType::none)
283  warn("Exit requested multiple times (req:%d)\n", static_cast<int>(req));
284  if (needExit == ReqType::none || req == ReqType::immediate)
285  needExit = req;
286 }
287 
288 void
290 {
291  if (doCkpt) {
292  // The first DistIface object called this right before writing the
293  // checkpoint. We need to drain the underlying physical network here.
294  // Note that other gem5 peers may enter this barrier at different
295  // ticks due to draining.
296  run(false);
297  // Only the "first" DistIface object has to perform the sync
298  doCkpt = false;
299  }
300 }
301 
302 void
304 {
305  int need_exit = static_cast<int>(needExit);
306  SERIALIZE_SCALAR(need_exit);
307 }
308 
309 void
311 {
312  int need_exit;
313  UNSERIALIZE_SCALAR(need_exit);
314  needExit = static_cast<ReqType>(need_exit);
315 }
316 
317 void
319 {
320  SERIALIZE_SCALAR(numExitReq);
321 }
322 
323 void
325 {
326  UNSERIALIZE_SCALAR(numExitReq);
327 }
328 
329 void
331 {
332  // Note that this may be called either from startup() or drainResume()
333 
334  // At this point, all DistIface objects has already called Sync::init() so
335  // we have a local minimum of the start tick and repeat for the periodic
336  // sync.
337  repeat = DistIface::sync->nextRepeat;
338  // Do a global barrier to agree on a common repeat value (the smallest
339  // one from all participating nodes.
340  if (!DistIface::sync->run(false))
341  panic("DistIface::SyncEvent::start() aborted\n");
342 
343  assert(!DistIface::sync->doCkpt);
344  assert(!DistIface::sync->doExit);
345  assert(!DistIface::sync->doStopSync);
346  assert(DistIface::sync->nextAt >= curTick());
347  assert(DistIface::sync->nextRepeat <= repeat);
348 
349  if (curTick() == 0)
350  assert(!scheduled());
351 
352  // Use the maximum of the current tick for all participating nodes or a
353  // user provided starting tick.
354  if (scheduled())
355  reschedule(DistIface::sync->nextAt);
356  else
357  schedule(DistIface::sync->nextAt);
358 
359  inform("Dist sync scheduled at %lu and repeats %lu\n", when(),
360  DistIface::sync->nextRepeat);
361 }
362 
363 void
365 {
366  // We may not start a global periodic sync while draining before taking a
367  // checkpoint. This is due to the possibility that peer gem5 processes
368  // may not hit the same periodic sync before they complete draining and
369  // that would make this periodic sync clash with sync called from
370  // DistIface::serialize() by other gem5 processes.
371  // We would need a 'distributed drain' solution to eliminate this
372  // restriction.
373  // Note that if draining was not triggered by checkpointing then we are
374  // fine since no extra global sync will happen (i.e. all peer gem5 will
375  // hit this periodic sync eventually).
376  panic_if(_draining && DistIface::sync->doCkpt,
377  "Distributed sync is hit while draining");
378  /*
379  * Note that this is a global event so this process method will be called
380  * by only exactly one thread.
381  */
382  /*
383  * We hold the eventq lock at this point but the receiver thread may
384  * need the lock to schedule new recv events while waiting for the
385  * dist sync to complete.
386  * Note that the other simulation threads also release their eventq
387  * locks while waiting for us due to the global event semantics.
388  */
389  {
391  // we do a global sync here that is supposed to happen at the same
392  // tick in all gem5 peers
393  if (!DistIface::sync->run(true))
394  return; // global sync aborted
395  // global sync completed
396  }
397  if (DistIface::sync->doCkpt)
398  exitSimLoop("checkpoint");
399  if (DistIface::sync->doExit) {
400  exitSimLoop("exit request from gem5 peers");
401  return;
402  }
403  if (DistIface::sync->doStopSync) {
404  DistIface::sync->doStopSync = false;
405  inform("synchronization disabled at %lu\n", curTick());
406 
407  // The switch node needs to wait for the next sync immediately.
408  if (DistIface::isSwitch) {
409  start();
410  } else {
411  // Wake up thread contexts on non-switch nodes.
412  for (auto *tc: primary->sys->threads) {
413  if (tc->status() == ThreadContext::Suspended)
414  tc->activate();
415  else
416  warn_once("Tried to wake up thread in dist-gem5, but it "
417  "was already awake!\n");
418  }
419  }
420  return;
421  }
422  // schedule the next periodic sync
423  repeat = DistIface::sync->nextRepeat;
424  schedule(curTick() + repeat);
425 }
426 
427 void
429 {
430  // This is called from the receiver thread when it starts running. The new
431  // receiver thread shares the event queue with the simulation thread
432  // (associated with the simulated Ethernet link).
433  curEventQueue(eventManager->eventQueue());
434 
435  recvDone = recv_done;
436  linkDelay = link_delay;
437 }
438 
439 Tick
441  Tick send_delay,
442  Tick prev_recv_tick)
443 {
444  Tick recv_tick = send_tick + send_delay + linkDelay;
445  // sanity check (we need atleast a send delay long window)
446  assert(recv_tick >= prev_recv_tick + send_delay);
447  panic_if(prev_recv_tick + send_delay > recv_tick,
448  "Receive window is smaller than send delay");
449  panic_if(recv_tick <= curTick(),
450  "Simulators out of sync - missed packet receive by %llu ticks"
451  "(rev_recv_tick: %lu send_tick: %lu send_delay: %lu "
452  "linkDelay: %lu )",
453  curTick() - recv_tick, prev_recv_tick, send_tick, send_delay,
454  linkDelay);
455 
456  return recv_tick;
457 }
458 
459 void
461 {
462  // Schedule pending packets asap in case link speed/delay changed when
463  // restoring from the checkpoint.
464  // This may be done during unserialize except that curTick() is unknown
465  // so we call this during drainResume().
466  // If we are not restoring from a checkppint then link latency could not
467  // change so we just return.
468  if (!ckptRestore)
469  return;
470 
472  while (!descQueue.empty()) {
473  Desc d = descQueue.front();
474  descQueue.pop();
475  d.sendTick = curTick();
476  d.sendDelay = d.packet->simLength; // assume 1 tick/byte max link speed
477  v.push_back(d);
478  }
479 
480  for (auto &d : v)
481  descQueue.push(d);
482 
483  if (recvDone->scheduled()) {
484  assert(!descQueue.empty());
485  eventManager->reschedule(recvDone, curTick());
486  } else {
487  assert(descQueue.empty() && v.empty());
488  }
489  ckptRestore = false;
490 }
491 
492 void
494  Tick send_tick,
495  Tick send_delay)
496 {
497  // Note : this is called from the receiver thread
498  curEventQueue()->lock();
499  Tick recv_tick = calcReceiveTick(send_tick, send_delay, prevRecvTick);
500 
501  DPRINTF(DistEthernetPkt, "DistIface::recvScheduler::pushPacket "
502  "send_tick:%llu send_delay:%llu link_delay:%llu recv_tick:%llu\n",
503  send_tick, send_delay, linkDelay, recv_tick);
504  // Every packet must be sent and arrive in the same quantum
505  assert(send_tick > primary->syncEvent->when() -
507  // No packet may be scheduled for receive in the arrival quantum
508  assert(send_tick + send_delay + linkDelay > primary->syncEvent->when());
509 
510  // Now we are about to schedule a recvDone event for the new data packet.
511  // We use the same recvDone object for all incoming data packets. Packet
512  // descriptors are saved in the ordered queue. The currently scheduled
513  // packet is always on the top of the queue.
514  // NOTE: we use the event queue lock to protect the receive desc queue,
515  // too, which is accessed both by the receiver thread and the simulation
516  // thread.
517  descQueue.emplace(new_packet, send_tick, send_delay);
518  if (descQueue.size() == 1) {
519  assert(!recvDone->scheduled());
520  eventManager->schedule(recvDone, recv_tick);
521  } else {
522  assert(recvDone->scheduled());
523  panic_if(descQueue.front().sendTick + descQueue.front().sendDelay > recv_tick,
524  "Out of order packet received (recv_tick: %lu top(): %lu\n",
525  recv_tick, descQueue.front().sendTick + descQueue.front().sendDelay);
526  }
527  curEventQueue()->unlock();
528 }
529 
532 {
533  // Note : this is called from the simulation thread when a receive done
534  // event is being processed for the link. We assume that the thread holds
535  // the event queue queue lock when this is called!
536  EthPacketPtr next_packet = descQueue.front().packet;
537  descQueue.pop();
538 
539  if (descQueue.size() > 0) {
540  Tick recv_tick = calcReceiveTick(descQueue.front().sendTick,
541  descQueue.front().sendDelay,
542  curTick());
543  eventManager->schedule(recvDone, recv_tick);
544  }
545  prevRecvTick = curTick();
546  return next_packet;
547 }
548 
549 void
551 {
552  SERIALIZE_SCALAR(sendTick);
553  SERIALIZE_SCALAR(sendDelay);
554  packet->serialize("rxPacket", cp);
555 }
556 
557 void
559 {
560  UNSERIALIZE_SCALAR(sendTick);
561  UNSERIALIZE_SCALAR(sendDelay);
562  packet = std::make_shared<EthPacketData>();
563  packet->unserialize("rxPacket", cp);
564 }
565 
566 void
568 {
569  SERIALIZE_SCALAR(prevRecvTick);
570  // serialize the receive desc queue
571  std::queue<Desc> tmp_queue(descQueue);
572  unsigned n_desc_queue = descQueue.size();
573  assert(tmp_queue.size() == descQueue.size());
574  SERIALIZE_SCALAR(n_desc_queue);
575  for (int i = 0; i < n_desc_queue; i++) {
576  tmp_queue.front().serializeSection(cp, csprintf("rxDesc_%d", i));
577  tmp_queue.pop();
578  }
579  assert(tmp_queue.empty());
580 }
581 
582 void
584 {
585  assert(descQueue.size() == 0);
586  assert(!recvDone->scheduled());
587  assert(!ckptRestore);
588 
589  UNSERIALIZE_SCALAR(prevRecvTick);
590  // unserialize the receive desc queue
591  unsigned n_desc_queue;
592  UNSERIALIZE_SCALAR(n_desc_queue);
593  for (int i = 0; i < n_desc_queue; i++) {
594  Desc recv_desc;
595  recv_desc.unserializeSection(cp, csprintf("rxDesc_%d", i));
596  descQueue.push(recv_desc);
597  }
598  ckptRestore = true;
599 }
600 
601 DistIface::DistIface(unsigned dist_rank,
602  unsigned dist_size,
603  Tick sync_start,
604  Tick sync_repeat,
605  EventManager *em,
606  bool use_pseudo_op,
607  bool is_switch, int num_nodes) :
608  syncStart(sync_start), syncRepeat(sync_repeat),
609  recvThread(nullptr), recvScheduler(em), syncStartOnPseudoOp(use_pseudo_op),
610  rank(dist_rank), size(dist_size)
611 {
612  DPRINTF(DistEthernet, "DistIface() ctor rank:%d\n",dist_rank);
613  isPrimary = false;
614  if (primary == nullptr) {
615  assert(sync == nullptr);
616  assert(syncEvent == nullptr);
617  isSwitch = is_switch;
618  if (is_switch)
619  sync = new SyncSwitch(num_nodes);
620  else
621  sync = new SyncNode();
622  syncEvent = new SyncEvent();
623  primary = this;
624  isPrimary = true;
625  }
627  distIfaceNum++;
628 }
629 
631 {
632  assert(recvThread);
633  recvThread->join();
634  delete recvThread;
635  if (distIfaceNum-- == 0) {
636  assert(syncEvent);
637  delete syncEvent;
638  assert(sync);
639  delete sync;
640  }
641  if (this == primary)
642  primary = nullptr;
643 }
644 
645 void
647 {
648  Header header;
649 
650  // Prepare a dist header packet for the Ethernet packet we want to
651  // send out.
653  header.sendTick = curTick();
654  header.sendDelay = send_delay;
655 
656  header.dataPacketLength = pkt->length;
657  header.simLength = pkt->simLength;
658 
659  // Send out the packet and the meta info.
660  sendPacket(header, pkt);
661 
662  DPRINTF(DistEthernetPkt,
663  "DistIface::sendDataPacket() done size:%d send_delay:%llu\n",
664  pkt->length, send_delay);
665 }
666 
667 void
668 DistIface::recvThreadFunc(Event *recv_done, Tick link_delay)
669 {
670  EthPacketPtr new_packet;
672 
673  // Initialize receive scheduler parameters
674  recvScheduler.init(recv_done, link_delay);
675 
676  // Main loop to wait for and process any incoming message.
677  for (;;) {
678  // recvHeader() blocks until the next dist header packet comes in.
679  if (!recvHeader(header)) {
680  // We lost connection to the peer gem5 processes most likely
681  // because one of them called m5 exit. So we stop here.
682  // Grab the eventq lock to stop the simulation thread
683  curEventQueue()->lock();
684  exitSimLoop("connection to gem5 peer got closed");
685  curEventQueue()->unlock();
686  // The simulation thread may be blocked in processing an on-going
687  // global synchronisation. Abort the sync to give the simulation
688  // thread a chance to make progress and process the exit event.
689  sync->abort();
690  // Finish receiver thread
691  break;
692  }
693 
694  // We got a valid dist header packet, let's process it
695  if (header.msgType == MsgType::dataDescriptor) {
696  recvPacket(header, new_packet);
697  recvScheduler.pushPacket(new_packet,
698  header.sendTick,
699  header.sendDelay);
700  } else {
701  // everything else must be synchronisation related command
702  if (!sync->progress(header.sendTick,
703  header.syncRepeat,
704  header.needCkpt,
705  header.needExit,
706  header.needStopSync))
707  // Finish receiver thread if simulation is about to exit
708  break;
709  }
710  }
711 }
712 
713 void
714 DistIface::spawnRecvThread(const Event *recv_done, Tick link_delay)
715 {
716  assert(recvThread == nullptr);
717 
718  recvThread = new std::thread(&DistIface::recvThreadFunc,
719  this,
720  const_cast<Event *>(recv_done),
721  link_delay);
722  recvThreadsNum++;
723 }
724 
727 {
728  DPRINTF(DistEthernet,"DistIFace::drain() called\n");
729  // This can be called multiple times in the same drain cycle.
730  if (this == primary)
731  syncEvent->draining(true);
732  return DrainState::Drained;
733 }
734 
735 void
737  DPRINTF(DistEthernet,"DistIFace::drainResume() called\n");
738  if (this == primary)
739  syncEvent->draining(false);
741 }
742 
743 void
745 {
746  // Drain the dist interface before the checkpoint is taken. We cannot call
747  // this as part of the normal drain cycle because this dist sync has to be
748  // called exactly once after the system is fully drained.
749  sync->drainComplete();
750 
751  unsigned rank_orig = rank, dist_iface_id_orig = distIfaceId;
752 
753  SERIALIZE_SCALAR(rank_orig);
754  SERIALIZE_SCALAR(dist_iface_id_orig);
755 
756  recvScheduler.serializeSection(cp, "recvScheduler");
757  if (this == primary) {
758  sync->serializeSection(cp, "Sync");
759  }
760 }
761 
762 void
764 {
765  unsigned rank_orig, dist_iface_id_orig;
766  UNSERIALIZE_SCALAR(rank_orig);
767  UNSERIALIZE_SCALAR(dist_iface_id_orig);
768 
769  panic_if(rank != rank_orig, "Rank mismatch at resume (rank=%d, orig=%d)",
770  rank, rank_orig);
771  panic_if(distIfaceId != dist_iface_id_orig, "Dist iface ID mismatch "
772  "at resume (distIfaceId=%d, orig=%d)", distIfaceId,
773  dist_iface_id_orig);
774 
775  recvScheduler.unserializeSection(cp, "recvScheduler");
776  if (this == primary) {
777  sync->unserializeSection(cp, "Sync");
778  }
779 }
780 
781 void
782 DistIface::init(const Event *done_event, Tick link_delay)
783 {
784  // Init hook for the underlaying message transport to setup/finalize
785  // communication channels
786  initTransport();
787 
788  // Spawn a new receiver thread that will process messages
789  // coming in from peer gem5 processes.
790  // The receive thread will also schedule a (receive) doneEvent
791  // for each incoming data packet.
792  spawnRecvThread(done_event, link_delay);
793 
794 
795  // Adjust the periodic sync start and interval. Different DistIface
796  // might have different requirements. The singleton sync object
797  // will select the minimum values for both params.
798  assert(sync != nullptr);
800 
801  // Initialize the seed for random generator to avoid the same sequence
802  // in all gem5 peer processes
803  assert(primary != nullptr);
804  if (this == primary)
805  random_mt.init(5489 * (rank+1) + 257);
806 }
807 
808 void
810 {
811  DPRINTF(DistEthernet, "DistIface::startup() started\n");
812  // Schedule synchronization unless we are not a switch in pseudo_op mode.
813  if (this == primary && (!syncStartOnPseudoOp || isSwitch))
814  syncEvent->start();
815  DPRINTF(DistEthernet, "DistIface::startup() done\n");
816 }
817 
818 bool
820 {
821  bool ret = true;
822  DPRINTF(DistEthernet, "DistIface::readyToCkpt() called, delay:%lu "
823  "period:%lu\n", delay, period);
824  if (primary) {
825  if (delay == 0) {
826  inform("m5 checkpoint called with zero delay => triggering collaborative "
827  "checkpoint\n");
829  } else {
830  inform("m5 checkpoint called with non-zero delay => triggering immediate "
831  "checkpoint (at the next sync)\n");
833  }
834  if (period != 0)
835  inform("Non-zero period for m5_ckpt is ignored in "
836  "distributed gem5 runs\n");
837  ret = false;
838  }
839  return ret;
840 }
841 
842 void
844 {
845  std::lock_guard<std::mutex> sync_lock(lock);
846  needStopSync = req;
847 }
848 
849 void
851 {
852  // Unforunate that we have to populate the system pointer member this way.
853  primary->sys = tc->getSystemPtr();
854 
855  // The invariant for both syncing and "unsyncing" is that all threads will
856  // stop executing intructions until the desired sync state has been reached
857  // for all nodes. This is the easiest way to prevent deadlock (in the case
858  // of "unsyncing") and causality errors (in the case of syncing).
859  if (primary->syncEvent->scheduled()) {
860  inform("Request toggling syncronization off\n");
862 
863  // At this point, we have no clue when everyone will reach the sync
864  // stop point. Suspend execution of all local thread contexts.
865  // Dist-gem5 will reactivate all thread contexts when everyone has
866  // reached the sync stop point.
867 #if THE_ISA != NULL_ISA
868  for (auto *tc: primary->sys->threads) {
869  if (tc->status() == ThreadContext::Active)
870  tc->quiesce();
871  }
872 #endif
873  } else {
874  inform("Request toggling syncronization on\n");
875  primary->syncEvent->start();
876 
877  // We need to suspend all CPUs until the sync point is reached by all
878  // nodes to prevent causality errors. We can also schedule CPU
879  // activation here, since we know exactly when the next sync will
880  // occur.
881 #if THE_ISA != NULL_ISA
882  for (auto *tc: primary->sys->threads) {
883  if (tc->status() == ThreadContext::Active)
884  tc->quiesceTick(primary->syncEvent->when() + 1);
885  }
886 #endif
887  }
888 }
889 
890 bool
892 {
893  bool ret = true;
894  DPRINTF(DistEthernet, "DistIface::readyToExit() called, delay:%lu\n",
895  delay);
896  if (primary) {
897  // To successfully coordinate an exit, all nodes must be synchronising
898  if (!primary->syncEvent->scheduled())
899  primary->syncEvent->start();
900 
901  if (delay == 0) {
902  inform("m5 exit called with zero delay => triggering collaborative "
903  "exit\n");
905  } else {
906  inform("m5 exit called with non-zero delay => triggering immediate "
907  "exit (at the next sync)\n");
909  }
910  ret = false;
911  }
912  return ret;
913 }
914 
915 uint64_t
917 {
918  uint64_t val;
919  if (primary) {
920  val = primary->rank;
921  } else {
922  warn("Dist-rank parameter is queried in single gem5 simulation.");
923  val = 0;
924  }
925  return val;
926 }
927 
928 uint64_t
930 {
931  uint64_t val;
932  if (primary) {
933  val = primary->size;
934  } else {
935  warn("Dist-size parameter is queried in single gem5 simulation.");
936  val = 1;
937  }
938  return val;
939 }
DistIface::unserialize
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:763
DistIface::distIfaceNum
static unsigned distIfaceNum
Number of DistIface objects (i.e.
Definition: dist_iface.hh:488
DistIface::Sync::drainComplete
void drainComplete()
Definition: dist_iface.cc:289
DistIface::Sync::requestExit
virtual void requestExit(ReqType req)=0
Serializable::unserializeSection
void unserializeSection(CheckpointIn &cp, const char *name)
Unserialize an a child object.
Definition: serialize.cc:177
DistIface::recvThreadsNum
static unsigned recvThreadsNum
Number of receiver threads (in this gem5 process)
Definition: dist_iface.hh:500
DistIface::DistIface
DistIface(unsigned dist_rank, unsigned dist_size, Tick sync_start, Tick sync_repeat, EventManager *em, bool use_pseudo_op, bool is_switch, int num_nodes)
ctor
Definition: dist_iface.cc:601
warn
#define warn(...)
Definition: logging.hh:239
DistIface
The interface class to talk to peer gem5 processes.
Definition: dist_iface.hh:99
EventQueue::unlock
void unlock()
Definition: eventq.hh:952
system.hh
DistIface::Sync
Definition: dist_iface.hh:116
UNSERIALIZE_SCALAR
#define UNSERIALIZE_SCALAR(scalar)
Definition: serialize.hh:591
ThreadContext::quiesce
void quiesce()
Quiesce thread context.
Definition: thread_context.cc:129
DistIface::Sync::abort
void abort()
Abort processing an on-going sync event (in case of an error, e.g.
Definition: dist_iface.cc:84
DistIface::SyncNode::run
bool run(bool same_tick) override
Core method to perform a full dist sync.
Definition: dist_iface.cc:123
warn_once
#define warn_once(...)
Definition: logging.hh:243
ArmISA::i
Bitfield< 7 > i
Definition: miscregs_types.hh:63
DistIface::SyncEvent::process
void process() override
This is a global event so process() will only be called by exactly one simulation thread.
Definition: dist_iface.cc:364
DistIface::recvThreadFunc
void recvThreadFunc(Event *recv_done, Tick link_delay)
The function executed by a receiver thread.
Definition: dist_iface.cc:668
DistIface::primary
static DistIface * primary
The very first DistIface object created becomes the primary interface.
Definition: dist_iface.hh:513
DistIface::SyncNode::serialize
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:303
DistIface::SyncNode::progress
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
Definition: dist_iface.cc:239
DistIface::drain
DrainState drain() override
Draining is the process of clearing out the states of SimObjects.These are the SimObjects that are pa...
Definition: dist_iface.cc:726
DistHeaderPkt::ReqType::none
@ none
random.hh
X86ISA::lock
Bitfield< 5 > lock
Definition: types.hh:78
DistIface::RecvScheduler::Desc::unserialize
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:558
Tick
uint64_t Tick
Tick count type.
Definition: types.hh:59
DistIface::RecvScheduler::resumeRecvTicks
void resumeRecvTicks()
Adjust receive ticks for pending packets when restoring from a checkpoint.
Definition: dist_iface.cc:460
DistIface::toggleSync
static void toggleSync(ThreadContext *tc)
Trigger the primary to start/stop synchronization.
Definition: dist_iface.cc:850
DistIface::RecvScheduler::Desc::serialize
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:550
DistIface::sys
static System * sys
System pointer used to wakeup sleeping threads when stopping sync.
Definition: dist_iface.hh:517
DistHeaderPkt::ReqType::pending
@ pending
header
output header
Definition: nop.cc:36
std::vector
STL vector class.
Definition: stl.hh:37
DistIface::RecvScheduler::popPacket
EthPacketPtr popPacket()
Fetch the next packet that is to be received by the simulated network link.
Definition: dist_iface.cc:531
GlobalSyncEvent::repeat
Tick repeat
Definition: global_event.hh:233
DistIface::SyncSwitch::serialize
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:318
DistIface::SyncNode
Definition: dist_iface.hh:203
Serializable::serializeSection
void serializeSection(CheckpointOut &cp, const char *name) const
Serialize an object into a new section.
Definition: serialize.cc:170
sim_exit.hh
DistHeaderPkt::MsgType::cmdSyncAck
@ cmdSyncAck
DistHeaderPkt::ReqType
ReqType
Definition: dist_packet.hh:65
DistHeaderPkt::Header
Definition: dist_packet.hh:77
EventQueue::ScopedRelease
Definition: eventq.hh:713
DistIface::SyncSwitch::progress
bool progress(Tick max_req_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync) override
Callback when the receiver thread gets a sync ack message.
Definition: dist_iface.cc:198
DistIface::SyncSwitch::run
bool run(bool same_tick) override
Core method to perform a full dist sync.
Definition: dist_iface.cc:155
DistIface::Sync::nextRepeat
Tick nextRepeat
The repeat value for the next periodic sync.
Definition: dist_iface.hh:149
DistHeaderPkt::MsgType::dataDescriptor
@ dataDescriptor
DrainState::Drained
@ Drained
Buffers drained, ready for serialization/handover.
DistIface::RecvScheduler::unserialize
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:583
DrainState
DrainState
Object drain/handover states.
Definition: drain.hh:71
random_mt
Random random_mt
Definition: random.cc:96
DistIface::init
void init(const Event *e, Tick link_delay)
Definition: dist_iface.cc:782
DistIface::size
unsigned size
The number of gem5 processes comprising this dist simulation.
Definition: dist_iface.hh:484
BaseGlobalEvent::when
Tick when() const
Definition: global_event.hh:136
DistIface::SyncNode::needStopSync
ReqType needStopSync
Sync stop requested.
Definition: dist_iface.hh:217
DistIface::SyncEvent
The global event to schedule periodic dist sync.
Definition: dist_iface.hh:295
DistIface::initTransport
virtual void initTransport()=0
Init hook for the underlaying transport.
cp
Definition: cprintf.cc:37
DistIface::RecvScheduler::Desc
Received packet descriptor.
Definition: dist_iface.hh:339
DistIface::SyncNode::requestStopSync
void requestStopSync(ReqType req) override
Definition: dist_iface.cc:843
DistIface::rankParam
static uint64_t rankParam()
Getter for the dist rank param.
Definition: dist_iface.cc:916
ThreadContext
ThreadContext is the external interface to all thread state for anything outside of the CPU.
Definition: thread_context.hh:88
Random::init
void init(uint32_t s)
Definition: random.cc:64
DistHeaderPkt::ReqType::immediate
@ immediate
Event
Definition: eventq.hh:248
sim_object.hh
System
Definition: system.hh:73
DPRINTF
#define DPRINTF(x,...)
Definition: trace.hh:237
DistIface::Sync::nextAt
Tick nextAt
Tick for the next periodic sync (if the event is not scheduled yet)
Definition: dist_iface.hh:153
DistIface::SyncNode::unserialize
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:310
DistIface::Sync::progress
virtual bool progress(Tick send_tick, Tick next_repeat, ReqType do_ckpt, ReqType do_exit, ReqType do_stop_sync)=0
Callback when the receiver thread gets a sync ack message.
ArmISA::d
Bitfield< 9 > d
Definition: miscregs_types.hh:60
DistIface::isSwitch
static bool isSwitch
Is this node a switch?
Definition: dist_iface.hh:521
DistHeaderPkt::ReqType::collective
@ collective
MipsISA::sr
Bitfield< 20 > sr
Definition: pra_constants.hh:116
DistIface::SyncNode::requestCkpt
void requestCkpt(ReqType req) override
Definition: dist_iface.cc:267
DistIface::rank
unsigned rank
The rank of this process among the gem5 peers.
Definition: dist_iface.hh:480
dist_iface.hh
DistIface::RecvScheduler::calcReceiveTick
Tick calcReceiveTick(Tick send_tick, Tick send_delay, Tick prev_recv_tick)
Calculate the tick to schedule the next receive done event.
Definition: dist_iface.cc:440
DistHeaderPkt::MsgType::cmdSyncReq
@ cmdSyncReq
DistIface::drainResume
void drainResume() override
Resume execution after a successful drain.
Definition: dist_iface.cc:736
DistIface::~DistIface
virtual ~DistIface()
Definition: dist_iface.cc:630
DistIface::sendCmd
virtual void sendCmd(const Header &header)=0
Send out a control command to the remote end.
exitSimLoop
void exitSimLoop(const std::string &message, int exit_code, Tick when, Tick repeat, bool serialize)
Schedule an event to exit the simulation loop (returning to Python) at the end of the current cycle (...
Definition: sim_events.cc:85
X86ISA::em
Bitfield< 2 > em
Definition: misc.hh:602
curEventQueue
EventQueue * curEventQueue()
Definition: eventq.hh:85
DistIface::isPrimary
bool isPrimary
Definition: dist_iface.hh:494
DistIface::SyncNode::SyncNode
SyncNode()
Definition: dist_iface.cc:108
DistIface::syncEvent
static SyncEvent * syncEvent
The singleton SyncEvent object to schedule periodic dist sync.
Definition: dist_iface.hh:508
DistIface::RecvScheduler::init
void init(Event *recv_done, Tick link_delay)
Initialize network link parameters.
Definition: dist_iface.cc:428
DistIface::recvThread
std::thread * recvThread
Receiver thread pointer.
Definition: dist_iface.hh:466
X86ISA::val
Bitfield< 63 > val
Definition: misc.hh:769
DistIface::sendPacket
virtual void sendPacket(const Header &header, const EthPacketPtr &packet)=0
Send out a data packet to the remote end.
ThreadContext::status
virtual Status status() const =0
DistIface::SyncEvent::start
void start()
Schedule the first periodic sync event.
Definition: dist_iface.cc:330
DistIface::Sync::requestStopSync
virtual void requestStopSync(ReqType req)=0
ThreadContext::quiesceTick
void quiesceTick(Tick resume)
Quiesce, suspend, and schedule activate at resume.
Definition: thread_context.cc:136
SERIALIZE_SCALAR
#define SERIALIZE_SCALAR(scalar)
Definition: serialize.hh:584
DistIface::readyToExit
static bool readyToExit(Tick delay)
Initiate the exit from the simulation.
Definition: dist_iface.cc:891
DistIface::SyncSwitch::unserialize
void unserialize(CheckpointIn &cp) override
Unserialize an object.
Definition: dist_iface.cc:324
DistIface::sync
static Sync * sync
The singleton Sync object to perform dist synchronisation.
Definition: dist_iface.hh:504
EventQueue::lock
void lock()
Provide an interface for locking/unlocking the event queue.
Definition: eventq.hh:951
DistIface::syncStartOnPseudoOp
bool syncStartOnPseudoOp
Use pseudoOp to start synchronization.
Definition: dist_iface.hh:474
EthPacketPtr
std::shared_ptr< EthPacketData > EthPacketPtr
Definition: etherpkt.hh:87
DistIface::SyncNode::requestExit
void requestExit(ReqType req) override
Definition: dist_iface.cc:278
panic_if
#define panic_if(cond,...)
Conditional panic macro that checks the supplied condition and only panics if the condition is true a...
Definition: logging.hh:197
System::threads
Threads threads
Definition: system.hh:304
ThreadContext::Suspended
@ Suspended
Temporarily inactive.
Definition: thread_context.hh:107
inform
#define inform(...)
Definition: logging.hh:240
BaseGlobalEvent::scheduled
bool scheduled() const
Definition: global_event.hh:126
DistIface::Sync::doStopSync
bool doStopSync
Flag is set if sync is to stop upon sync completion.
Definition: dist_iface.hh:145
DistIface::Sync::init
void init(Tick start, Tick repeat)
Initialize periodic sync params.
Definition: dist_iface.cc:66
DistIface::RecvScheduler::pushPacket
void pushPacket(EthPacketPtr new_packet, Tick send_tick, Tick send_delay)
Push a newly arrived packet into the desc queue.
Definition: dist_iface.cc:493
DistIface::readyToCkpt
static bool readyToCkpt(Tick delay, Tick period)
Initiate taking a checkpoint.
Definition: dist_iface.cc:819
DistIface::SyncSwitch
Definition: dist_iface.hh:238
ThreadContext::Active
@ Active
Running.
Definition: thread_context.hh:103
etherpkt.hh
DistIface::spawnRecvThread
void spawnRecvThread(const Event *recv_done, Tick link_delay)
spawn the receiver thread.
Definition: dist_iface.cc:714
DistIface::startup
void startup()
Definition: dist_iface.cc:809
DistIface::distIfaceId
unsigned distIfaceId
Unique id for the dist link.
Definition: dist_iface.hh:492
CheckpointOut
std::ostream CheckpointOut
Definition: serialize.hh:64
DistIface::recvPacket
virtual void recvPacket(const Header &header, EthPacketPtr &packet)=0
Receive a packet from the remote end.
DistIface::Sync::requestCkpt
virtual void requestCkpt(ReqType req)=0
EventManager
Definition: eventq.hh:984
curTick
Tick curTick()
The universal simulation clock.
Definition: cur_tick.hh:43
trace.hh
DistIface::SyncEvent::draining
bool draining() const
Definition: dist_iface.hh:321
DistIface::serialize
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:744
DistIface::recvHeader
virtual bool recvHeader(Header &header)=0
Receive a header (i.e.
DistIface::Sync::lock
std::mutex lock
The lock to protect access to the Sync object.
Definition: dist_iface.hh:122
CheckpointIn
Definition: serialize.hh:68
DistIface::packetOut
void packetOut(EthPacketPtr pkt, Tick send_delay)
Send out an Ethernet packet.
Definition: dist_iface.cc:646
DistIface::recvScheduler
RecvScheduler recvScheduler
Meta information about data packets received.
Definition: dist_iface.hh:470
DistIface::SyncSwitch::SyncSwitch
SyncSwitch(int num_nodes)
Definition: dist_iface.cc:93
DistIface::sizeParam
static uint64_t sizeParam()
Getter for the dist size param.
Definition: dist_iface.cc:929
csprintf
std::string csprintf(const char *format, const Args &...args)
Definition: cprintf.hh:158
ArmISA::v
Bitfield< 28 > v
Definition: miscregs_types.hh:51
thread_context.hh
ThreadContext::getSystemPtr
virtual System * getSystemPtr()=0
DistIface::syncStart
Tick syncStart
Tick to schedule the first dist sync event.
Definition: dist_iface.hh:457
panic
#define panic(...)
This implements a cprintf based panic() function.
Definition: logging.hh:171
DistIface::syncRepeat
Tick syncRepeat
Frequency of dist sync events in ticks.
Definition: dist_iface.hh:461
DistIface::RecvScheduler::serialize
void serialize(CheckpointOut &cp) const override
Serialize an object.
Definition: dist_iface.cc:567

Generated on Tue Jun 22 2021 15:28:28 for gem5 by doxygen 1.8.17