====== Writer ====== ===== FIFO ===== * Atomare push() und pop() operationen * Ein Leser, viele Schreiber Implementierungen: * [[http://scholar.google.com/scholar?cluster=17227025985280150504&hl=en&as_sdt=0,5|Simple, fast, and practical non-blocking and blocking concurrent queue algorithms]] \\ einfach * [[http://scholar.google.com/scholar?cluster=9029243256979780697&hl=en&as_sdt=0,5&sciodt=0,5|An Optimistic Approach to Lock-Free FIFO Queues]] \\ schneller bei vielen gleichzeitigen zugriffen (hier nicht nötig) * [[http://blog.lse.epita.fr/articles/42-implementing-generic-double-word-compare-and-swap-.html|C++ Implementierung]] der einfachen Variante. D kann nur bis zu 64 Bit CAS, egal ob 32 Bit oder 64 Bit Architektur. 64 Bit Architektur könnte jedoch auch 128 Bit CAS. Da ein double word wür beide Implementierungen nötig ist, ist nur 32 Bit Architektur möglich. ===== Code ===== FIFO writeBuf = [] byte[] writeRemaining = "" int writeUserCnt = 0 writeFdEnabled() { round = 0 next = writeBuf.pop() while(next != null) { atomicOp!"+="(writeUserCnt, 1) // Fill the send buffer with pieces from the FIFO up to a linit // The buffer can also contain previouly unsend data while(len(writeRemaining) < 1024 && next != null) { // Or the size of the send buffer? writeRemaining += next next = writeBuf.pop() } // Send the buffer i = send(writeRemaining) // If at least a part of the buffer could be send, remove the sent part if(i > 0) { writeRemaining = writeRemaining[i:] } else if(i == -1) { atomicOp!"-="(writeUserCnt, 1) if(errno == E_WOULD_BLOCK) { if(round == 0) { debug("Write should not block on the first attempt") } break } debug("Error " ~ errno ~ " on write") close() break } // Unexpected behaviour! else { debug("Unknown send exit code " ~ i) close() break } round++ i = atomicOp!"-="(writeUserCnt, 1) if(i == 0) { next = writeBuf.pop() } else if(i < 0){ debug("To many people left the room!") close() break } } enableFd(reader) // Noch irgendwie dem game mitteilen? // akkumulierte messages senden // Kommen wir hier auch durch einen fehlerfall hin // Nicht 2 reader aktivieren! if(round == 0) { debug("Should not be woken up before FIFO has data") } } write(byte[] data) { atomicOp!"+="(writeUserCnt, 1) disableFd(reader) // Noch irgendwie dem game mitteilen? // Read prozess abbrechen amIFirst = writeBuf.push(data) i = atomicOp!"-="(writeUserCnt, 1) // I am the last one if(i == 0) { if(amIFirst == True) { enableFd(writeFd) } } else if(i < 0){ debug("To many people left the room!") close() break } }