Queues.Mod 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. (* Lock-free queues *)
  2. (* Copyright (C) Florian Negele *)
  3. MODULE Queues;
  4. IMPORT CPU, Processors;
  5. CONST First = 0; Last = 0; Next = 1;
  6. TYPE Node = POINTER {DISPOSABLE} TO RECORD next: Node; item: Item END;
  7. (** Represents an abstract element of a queue. *)
  8. TYPE Item* = OBJECT {DISPOSABLE}
  9. VAR node {UNTRACED} := NIL: Node;
  10. (** Finalizes the element by disposing any resources associated with it. *)
  11. PROCEDURE ~Finalize-;
  12. BEGIN {UNCOOPERATIVE, UNCHECKED}
  13. IF Acquire (node) THEN node.next := NIL; node.item := NIL; DISPOSE (node) END;
  14. END Finalize;
  15. END Item;
  16. (** Represents a first-in first-out data structure. *)
  17. TYPE Queue* = RECORD first := NIL, last := NIL: Node END;
  18. (** Represents a first-in first-out data structure which is aligned for optimal cache behavior. *)
  19. TYPE AlignedQueue* = RECORD {ALIGNED (CPU.CacheLineSize)} (Queue) END;
  20. VAR processors: ARRAY Processors.Maximum OF RECORD hazard {UNTRACED}, pooled {UNTRACED}: ARRAY 2 OF Node; dummy {ALIGNED (CPU.CacheLineSize)} : RECORD END END;
  21. PROCEDURE Swap (VAR shared {UNTRACED}, node {UNTRACED}: Node);
  22. VAR current {UNTRACED}: Node;
  23. BEGIN {UNCOOPERATIVE, UNCHECKED}
  24. current := CAS (shared, NIL, NIL);
  25. IF CAS (shared, current, node) = current THEN node := current END;
  26. END Swap;
  27. PROCEDURE Acquire (VAR node {UNTRACED}: Node): BOOLEAN;
  28. VAR index := 0: SIZE;
  29. BEGIN {UNCOOPERATIVE, UNCHECKED}
  30. WHILE (node # NIL) & (index # Processors.Maximum) DO
  31. IF node = processors[index].hazard[First] THEN Swap (processors[index].pooled[First], node); index := 0;
  32. ELSIF node = processors[index].hazard[Next] THEN Swap (processors[index].pooled[Next], node); index := 0;
  33. ELSE INC (index) END;
  34. END;
  35. RETURN node # NIL;
  36. END Acquire;
  37. PROCEDURE Initialize (VAR queue: Queue; VAR sentinel: Node);
  38. VAR last: Node;
  39. BEGIN {UNCOOPERATIVE, UNCHECKED}
  40. IF ~Acquire (sentinel) THEN NEW (sentinel); ASSERT (sentinel # NIL) END;
  41. last := CAS (queue.last, NIL, sentinel);
  42. IF last = NIL THEN
  43. ASSERT (CAS (queue.first, NIL, sentinel) = NIL);
  44. ELSE
  45. DISPOSE (sentinel); sentinel := last;
  46. END;
  47. END Initialize;
  48. PROCEDURE Access (VAR node, reference: Node; pointer: SIZE);
  49. VAR value: Node; index: SIZE;
  50. BEGIN {UNCOOPERATIVE, UNCHECKED}
  51. index := Processors.GetCurrentIndex ();
  52. LOOP
  53. processors[index].hazard[pointer] := node;
  54. value := CAS (reference, NIL, NIL);
  55. IF value = node THEN EXIT END;
  56. node := value;
  57. END;
  58. END Access;
  59. PROCEDURE Discard (pointer: SIZE);
  60. BEGIN {UNCOOPERATIVE, UNCHECKED} processors[Processors.GetCurrentIndex ()].hazard[pointer] := NIL;
  61. END Discard;
  62. (** Appends an element at the back of a queue. *)
  63. PROCEDURE Enqueue- (item: Item; VAR queue: Queue);
  64. VAR node {UNTRACED}, last, next: Node;
  65. BEGIN {UNCOOPERATIVE, UNCHECKED}
  66. (* check for valid argument *)
  67. ASSERT (item # NIL);
  68. node := item.node; item.node := NIL;
  69. (* associate a node with the element *)
  70. IF ~Acquire (node) THEN NEW (node); ASSERT (node # NIL) END;
  71. node.next := NIL; node.item := item;
  72. LOOP
  73. (* associate a sentinel node with the queue *)
  74. last := CAS (queue.last, NIL, NIL);
  75. IF last = NIL THEN Initialize (queue, last) END;
  76. (* update the successor of the last node *)
  77. Access (last, queue.last, Last);
  78. next := CAS (last.next, NIL, node);
  79. IF next = NIL THEN EXIT END;
  80. (* update the last node since there is a successor *)
  81. IF CAS (queue.last, last, next) # last THEN CPU.Backoff END;
  82. END;
  83. (* update the last node of the queue *)
  84. ASSERT (CAS (queue.last, last, node) # NIL);
  85. Discard (Last);
  86. END Enqueue;
  87. (** Removes the first element at the front of a queue and returns it in a variable parameter. *)
  88. (** If there are no elements in the queue, the procedure returns FALSE and sets the variable parameter to NIL. *)
  89. PROCEDURE Dequeue- (VAR item: Item; VAR queue: Queue): BOOLEAN;
  90. VAR first, next, last: Node;
  91. BEGIN {UNCOOPERATIVE, UNCHECKED}
  92. LOOP
  93. (* check whether the sentinel node exists *)
  94. first := CAS (queue.first, NIL, NIL);
  95. IF first = NIL THEN item := NIL; RETURN FALSE END;
  96. (* check whether the queue is empty *)
  97. Access (first, queue.first, First);
  98. next := CAS (first.next, NIL, NIL);
  99. Access (next, first.next, Next);
  100. IF next = NIL THEN item := NIL; Discard (First); Discard (Next); RETURN FALSE END;
  101. (* ensure queue consistency *)
  102. last := CAS (queue.last, first, next);
  103. item := next.item;
  104. (* update the sentinel node with its successor *)
  105. IF CAS (queue.first, first, next) = first THEN EXIT END;
  106. Discard (Next); CPU.Backoff;
  107. END;
  108. (* associate the previous sentinel node with the dequeued element *)
  109. item.node := first; Discard (First); Discard (Next); RETURN TRUE;
  110. END Dequeue;
  111. (** Disposes the elements of a queue. *)
  112. PROCEDURE Dispose- (VAR queue {UNTRACED}: Queue);
  113. VAR node {UNTRACED}, next {UNTRACED}: Node;
  114. BEGIN {UNCOOPERATIVE, UNCHECKED}
  115. node := queue.first;
  116. IF node = NIL THEN RETURN END;
  117. next := node.next; queue.first := NIL; queue.last := NIL;
  118. node.next := NIL; node.item := NIL; DISPOSE (node);
  119. WHILE next # NIL DO node := next; next := node.next; DISPOSE (node.item) END;
  120. END Dispose;
  121. (** Terminates the module and disposes all of its resources. *)
  122. (** @topic Runtime Call *)
  123. PROCEDURE Terminate-;
  124. VAR index, pointer: SIZE;
  125. BEGIN {UNCOOPERATIVE, UNCHECKED}
  126. FOR index := 0 TO Processors.Maximum - 1 DO
  127. FOR pointer := First TO Next DO
  128. ASSERT (processors[index].hazard[pointer] = NIL);
  129. IF processors[index].pooled[pointer] # NIL THEN DISPOSE (processors[index].pooled[pointer]) END;
  130. END;
  131. END;
  132. END Terminate;
  133. END Queues.