2
0
Quellcode durchsuchen

очередь в монге!

κρμγ vor 5 Jahren
Ursprung
Commit
ca179f6688

+ 3 - 1
db/2020-05/indexes.js

@@ -1 +1,3 @@
-db.getCollection("coin-to-account-link").createIndex({coinId: 1, accountId: 1}, {unique: true})
+db.getCollection("coin-to-account").createIndex({coinId: 1, accountId: 1}, {unique: true})
+db.getCollection("coin").createIndex({"status": 1, "coin.value": 1, "coin.era": 1})
+db.getCollection("coin").createIndex({"coin._id": 1})

+ 4 - 0
pom.xml

@@ -187,6 +187,10 @@
                     </execution>
                 </executions>
                 <configuration>
+                    <sourceDirs>
+                        <sourceDir>src/main/java</sourceDir>
+                        <sourceDir>src/main/kotlin</sourceDir>
+                    </sourceDirs>
                     <compilerPlugins>
                         <plugin>spring</plugin>
                     </compilerPlugins>

+ 2 - 0
src/main/java/inn/ocsf/bee/freigeld/core/model/Bank.java

@@ -24,4 +24,6 @@ public interface Bank extends PrivatePerson {
 
     @NotNull
     Stream<BankAccount> getAccounts();
+
+    BankAccount getAccount(@NotNull UUID accountId);
 }

+ 1 - 4
src/main/java/inn/ocsf/bee/freigeld/core/model/BankEvent.java

@@ -1,7 +1,4 @@
 package inn.ocsf.bee.freigeld.core.model;
 
-import java.util.UUID;
-
-public interface BankEvent {
-    UUID getId();
+public abstract class BankEvent extends Ticket {
 }

+ 3 - 3
src/main/java/inn/ocsf/bee/freigeld/core/model/Emitter.java

@@ -10,8 +10,8 @@ import java.util.stream.Collectors;
 
 public interface Emitter {
 
-    default void emit(long count) {
-        emit(count, CoinValue.one);
+    default void emit(long count, @NotNull CoinValue value) {
+        emit(count, value, 0);
     }
 
     default Collection<UUID> emit(@NotNull Collection<Coin> futureCoins) {
@@ -20,7 +20,7 @@ public interface Emitter {
         }).map(this::emitOne).collect(Collectors.toList());
     }
 
-    void emit(long count, @NotNull CoinValue value);
+    void emit(long count, @NotNull CoinValue value, int era);
 
     UUID emitOne(@NotNull Coin coin);
 

+ 1 - 1
src/main/java/inn/ocsf/bee/freigeld/core/model/EmitterEvent.java

@@ -1,4 +1,4 @@
 package inn.ocsf.bee.freigeld.core.model;
 
-public interface EmitterEvent {
+public abstract class EmitterEvent extends Ticket {
 }

+ 1 - 1
src/main/java/inn/ocsf/bee/freigeld/core/model/EmitterStartRecalculationEvent.java

@@ -1,4 +1,4 @@
 package inn.ocsf.bee.freigeld.core.model;
 
-public class EmitterStartRecalculationEvent implements EmitterEvent {
+public class EmitterStartRecalculationEvent extends EmitterEvent {
 }

+ 1 - 1
src/main/java/inn/ocsf/bee/freigeld/core/model/EmitterStopRecalculationEvent.java

@@ -4,7 +4,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 
-public class EmitterStopRecalculationEvent implements EmitterEvent {
+public class EmitterStopRecalculationEvent extends EmitterEvent {
 
     private Set<UUID> nullCoins = new HashSet<>();
 

+ 26 - 0
src/main/java/inn/ocsf/bee/freigeld/core/model/Ticket.java

@@ -0,0 +1,26 @@
+package inn.ocsf.bee.freigeld.core.model;
+
+import com.querydsl.core.annotations.QueryEntity;
+
+import java.util.UUID;
+
+@QueryEntity
+public abstract class Ticket {
+
+    private UUID id;
+
+    public Ticket() {
+    }
+
+    public Ticket(UUID id) {
+        this.id = id;
+    }
+
+    public UUID getId() {
+        return id;
+    }
+
+    public void setId(UUID id) {
+        this.id = id;
+    }
+}

+ 5 - 0
src/main/java/inn/ocsf/bee/freigeld/core/model/TicketStatus.java

@@ -0,0 +1,5 @@
+package inn.ocsf.bee.freigeld.core.model;
+
+public enum TicketStatus {
+    created, completed
+}

+ 1 - 1
src/main/java/inn/ocsf/bee/freigeld/core/model/data/CoinAccountLink.java

@@ -8,7 +8,7 @@ import org.springframework.data.mongodb.core.mapping.Document;
 import java.util.Date;
 import java.util.UUID;
 
-@Document("coin-to-account-link")
+@Document("coin-to-account")
 public class CoinAccountLink {
 
     @Id

+ 68 - 0
src/main/java/inn/ocsf/bee/freigeld/core/model/data/TicketData.java

@@ -0,0 +1,68 @@
+package inn.ocsf.bee.freigeld.core.model.data;
+
+import inn.ocsf.bee.freigeld.core.model.Ticket;
+import inn.ocsf.bee.freigeld.core.model.TicketStatus;
+import org.bson.types.ObjectId;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.annotation.Version;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+import java.util.Date;
+
+@Document("ticket")
+public class TicketData {
+
+    @Id
+    private ObjectId id;
+
+    @Version
+    private Long version;
+
+    private TicketStatus status;
+
+    private Ticket ticket;
+
+    private String topic;
+
+    private Date ts;
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public ObjectId getId() {
+        return id;
+    }
+
+    public void setId(ObjectId id) {
+        this.id = id;
+    }
+
+    public TicketStatus getStatus() {
+        return status;
+    }
+
+    public void setStatus(TicketStatus status) {
+        this.status = status;
+    }
+
+    public Ticket getTicket() {
+        return ticket;
+    }
+
+    public void setTicket(Ticket ticket) {
+        this.ticket = ticket;
+    }
+
+    public Date getTs() {
+        return ts;
+    }
+
+    public void setTs(Date ts) {
+        this.ts = ts;
+    }
+}

+ 19 - 0
src/main/java/inn/ocsf/bee/freigeld/core/repo/TicketRepository.java

@@ -0,0 +1,19 @@
+package inn.ocsf.bee.freigeld.core.repo;
+
+import inn.ocsf.bee.freigeld.core.model.TicketStatus;
+import inn.ocsf.bee.freigeld.core.model.data.TicketData;
+import org.bson.types.ObjectId;
+import org.springframework.data.mongodb.repository.MongoRepository;
+import org.springframework.data.querydsl.QuerydslPredicateExecutor;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.UUID;
+
+public interface TicketRepository extends MongoRepository<TicketData, ObjectId>, QuerydslPredicateExecutor<TicketData> {
+
+    Optional<TicketData> findFirstByTopicAndStatusInOrderByIdAsc(String topic, Collection<TicketStatus> status);
+
+    Optional<TicketData> findOneByTicketId(UUID ticket_id);
+
+}

+ 14 - 10
src/main/kotlin/inn/ocsf/bee/freigeld/core/data/CentralBankAccountLevel.kt

@@ -33,7 +33,7 @@ abstract class CentralBankAccountLevel : CentralBank {
     @Inject
     private lateinit var linkRepo: CoinAccountLinkRepository
 
-    protected val selfId = UUID.fromString("a8486777-834c-4267-9f10-69f03714c523")
+    protected val bankId = UUID.fromString("a8486777-834c-4267-9f10-69f03714c523")
 
     private val log = LoggerFactory.getLogger(javaClass)
 
@@ -70,38 +70,42 @@ abstract class CentralBankAccountLevel : CentralBank {
     fun init() {
         accountRepo.deleteAll()
         linkRepo.deleteAll()
-        val thisBank = world.getPerson(selfId) ?: world.addPerson(PersonData.BankPrivate(selfId))
-        val thisAccounts = getAccounts(selfId)
+        val thisBank = world.getPerson(bankId) ?: world.addPerson(PersonData.BankPrivate(bankId))
+        val thisAccounts = getAccounts(bankId)
         if (thisAccounts.isEmpty()) {
-            addAccount(selfId)
+            addAccount(bankId)
         }
     }
 
     override fun addAccount(personId: UUID): BankAccount {
         val ad = BankAccountData()
-        ad.bankId = selfId
+        ad.bankId = bankId
         ad.account = BankAccountData.BankAccountDefault(UUID.randomUUID(), personId)
         return delegateAccount(accountRepo.save(ad).account)
     }
 
     override fun getId(): UUID {
-        return selfId
+        return bankId
     }
 
     override fun hasPerson(person: UUID): Boolean {
-        return accountRepo.count(Expressions.allOf(bankAccountData.bankId.eq(selfId), bankAccountData.account.ownerId.eq(person))) > 0
+        return accountRepo.count(Expressions.allOf(bankAccountData.bankId.eq(bankId), bankAccountData.account.ownerId.eq(person))) > 0
     }
 
     override fun hasAccount(account: UUID): Boolean {
-        return accountRepo.exists(Expressions.allOf(bankAccountData.bankId.eq(selfId), bankAccountData.account.id.eq(account)))
+        return accountRepo.exists(Expressions.allOf(bankAccountData.bankId.eq(bankId), bankAccountData.account.id.eq(account)))
     }
 
     override fun getAccounts(person: UUID): MutableCollection<BankAccount> {
-        return accountRepo.findAll(Expressions.allOf(bankAccountData.bankId.eq(selfId), bankAccountData.account.ownerId.eq(person))).map { delegateAccount(it.account) }.toMutableList()
+        return accountRepo.findAll(Expressions.allOf(bankAccountData.bankId.eq(bankId), bankAccountData.account.ownerId.eq(person))).map { delegateAccount(it.account) }.toMutableList()
     }
 
     override fun getAccounts(): Stream<BankAccount> {
-        return StreamSupport.stream(accountRepo.findAll(bankAccountData.bankId.eq(selfId)).spliterator(), false).map { delegateAccount(it.account) }
+        return StreamSupport.stream(accountRepo.findAll(bankAccountData.bankId.eq(bankId)).spliterator(), false).map { delegateAccount(it.account) }
+    }
+
+    override fun getAccount(accountId: UUID): BankAccount? {
+        return accountRepo.findOne(Expressions.allOf(bankAccountData.bankId.eq(bankId), bankAccountData.account.id.eq(accountId))).map { delegateAccount(it.account) }.orElse(null)
     }
 }
 

+ 297 - 0
src/main/kotlin/inn/ocsf/bee/freigeld/core/data/CentralBankQueueLevel.kt

@@ -0,0 +1,297 @@
+package inn.ocsf.bee.freigeld.core.data
+
+import inn.ocsf.bee.freigeld.core.calc.CoinStrategy
+import inn.ocsf.bee.freigeld.core.calc.CoinUtils
+import inn.ocsf.bee.freigeld.core.model.*
+import inn.ocsf.bee.freigeld.core.service.TicketService
+import kotlinx.coroutines.*
+import org.slf4j.LoggerFactory
+import org.springframework.beans.factory.annotation.Autowired
+import org.springframework.stereotype.Service
+import java.util.*
+import java.util.concurrent.CompletableFuture
+import javax.annotation.PostConstruct
+import javax.inject.Inject
+
+@Service
+class CentralBankQueueLevel : CentralBankAccountLevel() {
+
+    companion object {
+        val bankTicketChannelName = Bank::class.java.name
+    }
+
+    @Autowired
+    private lateinit var emitter: GlobalEmitter
+
+    @Inject
+    private lateinit var ticketService: TicketService
+
+    val globalFutureMap = mutableMapOf<UUID, CompletableDeferred<Any>>()
+
+    private val log = LoggerFactory.getLogger(javaClass)
+
+    override fun exchange(to: BankAccount, from: BankAccount, amount: Long): CompletableFuture<Any> {
+        val e = ExchangeStartEvent(UUID.randomUUID(), to, from, amount)
+        val ret = CompletableFuture<Any>()
+        globalFutureMap.computeIfAbsent(e.id) { id ->
+            val def = CompletableDeferred<Any>()
+            def.invokeOnCompletion { t: Throwable? -> if (t == null) ret.complete(null) else ret.completeExceptionally(t) }
+            def
+        }
+        ticketService.offerLast(bankTicketChannelName, e)
+        return ret
+    }
+
+    suspend fun pollLater(e: BankEvent): Boolean {
+        return when (e) {
+            is ExchangeStartEvent -> {
+                val eFrom = e.from?.let { getAccount(it) } ?: emitter
+                val eTo = e.to?.let { getAccount(it) }!!
+                val eAmount = e.amount ?: 0L
+                val selfIds = getAccounts(id).map { it.id }.toSet()
+                CoinUtils.makeSomeStrategy(CoinStrategy(eAmount, if (e.from != emitter.id) eFrom.coins else null, if (!selfIds.contains(e.from)) eTo.coins else null)).let { st ->
+                    if (st.res!!.isOk()) {
+                        st
+                    } else if (st.res!!.isErr()) {
+                        ticketService.offerLast(bankTicketChannelName, ExchangeFailedEvent(UUID.randomUUID(), e, "strategy error ${st.res}"))
+                        null
+                    } else {
+                        TODO()
+                    }
+                }?.let { st ->
+                    log.info("${id} do ${st.res?.name} transaction: ${e.from} -> ${e.to} = ${e.amount}")
+                    var fromAmount = eFrom.overall
+                    var toAmount = eTo.overall
+
+                    val leftExtracted = if (st.leftExtract != null) {
+                        val old = eFrom.extract(st.leftExtract?.map { it.id }!!)
+                        //emitter.accept(old) later
+                        old
+                    } else {
+                        listOf()
+                    }
+
+                    if (st.leftEmit != null) {
+                        val newIds = emitter.emit(st.leftEmit!!)
+                        val newCoins = emitter.extract(newIds)
+                        eFrom.accept(newCoins)
+                        if (!selfIds.contains(e.from)) {
+                            emitter.free(newCoins)
+                        } else {
+                            //do nothing
+                        }
+                    }
+
+                    val rightExtracted = if (st.rightExtract != null) {
+                        val old = eTo.extract(st.rightExtract?.map { it.id }!!)
+                        //emitter.accept(old) later
+                        old
+                    } else {
+                        listOf()
+                    }
+
+                    if (st.rightEmit != null) {
+                        val newIds = emitter.emit(st.rightEmit!!)
+                        if (e.from == emitter.id) {
+                            fromAmount = eFrom.overall
+                        }
+                        val newCoins = emitter.extract(newIds)
+                        eTo.accept(newCoins)
+                        if (!selfIds.contains(e.to)) {
+                            emitter.free(newCoins)
+                        } else {
+                            //do nothing
+                        }
+                    }
+
+                    if (st.rightCashback != null) {
+                        val localCoins = leftExtracted.map { it.id to it }.toMap().toMutableMap()
+                        val externalCoins = emitter.extract(st.rightCashback?.filterNot { localCoins.containsKey(it.id) }?.map { it.id }!!)
+                        val sameCoins = st.rightCashback?.filter { localCoins.containsKey(it.id) } ?: listOf()
+                        val someCoins = sameCoins + externalCoins
+                        eTo.accept(someCoins)
+                        someCoins.forEach { localCoins.remove(it.id) }
+                        emitter.accept(localCoins.values)
+                        if (!selfIds.contains(e.to)) {
+                            emitter.free(someCoins)
+                        } else {
+                            //do nothing
+                        }
+                    }
+
+                    if (st.leftCashback != null) {
+                        val localCoins = rightExtracted.map { it.id to it }.toMap().toMutableMap()
+                        val externalCoins = emitter.extract(st.leftCashback?.filterNot { localCoins.containsKey(it.id) }?.map { it.id }!!)
+                        val sameCoins = st.leftCashback?.filter { localCoins.containsKey(it.id) } ?: listOf()
+                        val someCoins = sameCoins + externalCoins
+                        eFrom.accept(someCoins)
+                        someCoins.forEach { localCoins.remove(it.id) }
+                        emitter.accept(localCoins.values)
+                        if (!selfIds.contains(e.from)) {
+                            emitter.free(someCoins)
+                        } else {
+                            //do nothing
+                        }
+                    }
+
+                    val creditOk = (fromAmount - eAmount == eFrom.overall)
+                    val debitOk = (toAmount + eAmount == eTo.overall)
+                    val fromCoins = eFrom.coins
+                    val toCoins = eTo.coins
+                    val uniqOk = if (fromCoins != null) {
+                        if (toCoins != null) {
+                            fromCoins.map { it.id }.toSet().intersect(toCoins.map { it.id }.toSet()).isEmpty()
+                        } else {
+                            true
+                        }
+                    } else {
+                        true
+                    }
+                    if (debitOk && creditOk && uniqOk) {
+                        true
+                    } else {
+                        TODO()
+                        false
+                    }
+                }?.let { ok ->
+                    if (ok) {
+                        ticketService.offerLast(bankTicketChannelName, ExchangeSuccessEvent(UUID.randomUUID(), e))
+                    } else {
+                        ticketService.offerLast(bankTicketChannelName, ExchangeFailedEvent(UUID.randomUUID(), e, "exchange strategy failed"))
+                    }
+                    ok
+                } ?: true
+            }
+            is ExchangeSuccessEvent -> {
+                globalFutureMap.get(e.parentEvent?.id)!!.let {
+                    globalFutureMap.remove(e.parentEvent?.id)
+                    it.complete(true)
+                }
+                true
+            }
+            is ExchangeFailedEvent -> {
+                globalFutureMap.get(e.parentEvent?.id)?.let {
+                    globalFutureMap.remove(e.parentEvent?.id)
+                    it.completeExceptionally(ExchangeFailedException(e, e.reason))
+                }
+                true
+            }
+            is BankPauseOnRecalcEvent -> !e.paused!!
+            else -> throw IllegalArgumentException("wrong event type ${e.javaClass.name}")
+        }
+    }
+
+    fun poll(e: BankEvent): Boolean {
+        return runBlocking {
+            try {
+                pollLater(e)
+            } catch (t: Throwable) {
+                log.error("error in event", t)
+                true
+            }
+        }
+    }
+
+    val maxPollCount = 1024
+
+    @PostConstruct
+    fun start() {
+        GlobalScope.launch {
+            do {
+                tick()
+                delay(200)
+            } while (true)
+        }
+        emitter.listen { e ->
+            log.info("emitter event ${e.javaClass.name}")
+            when (e) {
+                is EmitterStartRecalculationEvent -> {
+                    ticketService.offerLast(bankTicketChannelName, BankPauseOnRecalcEvent(UUID.randomUUID(), e))
+                    runBlocking {
+                        do {
+                            delay(500)
+                        } while (!(ticketService.peekFirst(bankTicketChannelName) is BankPauseOnRecalcEvent))
+                    }
+                }
+                is EmitterStopRecalculationEvent -> {
+                    accounts.map { it to it.coins?.map { it.id }?.intersect(e.nullCoins) }.filter { it.second?.isNotEmpty() ?: false }.forEach { ap ->
+                        val nullCoins = ap.second?.map {
+                            //(ap.first as DemoAccount).internalCoins.get(it)?.incEra() //TODO убрать!
+                            ap.first.extractOne(it)
+                        }?.toSet() ?: setOf()
+                        emitter.accept(nullCoins)
+                    }
+                    ticketService.peekFirst(bankTicketChannelName) { (it as BankPauseOnRecalcEvent).paused = false; it }
+                }
+            }
+        }
+    }
+
+    fun tick() {
+        var pollCount = 0
+        do {
+            val e = ticketService.peekFirst(bankTicketChannelName)
+            if (e != null && e is AbstractBankEvent) {
+                if (poll(e)) {
+                    ticketService.remove(e)
+                }
+            }
+            pollCount++
+        } while (e != null && pollCount < maxPollCount)
+    }
+
+}
+
+abstract class AbstractBankEvent : BankEvent()
+
+abstract class ExchangeBankEvent : AbstractBankEvent()
+
+class ExchangeSuccessEvent() : ExchangeBankEvent() {
+
+    var parentEvent: ExchangeStartEvent? = null
+
+    constructor(id: UUID, parentEvent: ExchangeStartEvent) : this() {
+        this.id = id
+        this.parentEvent = parentEvent
+    }
+}
+
+class ExchangeFailedEvent() : ExchangeBankEvent() {
+
+    var reason: String? = null
+    var parentEvent: ExchangeStartEvent? = null
+
+    constructor(id: UUID, parentEvent: ExchangeStartEvent, reason: String) : this() {
+        this.id = id
+        this.parentEvent = parentEvent
+        this.reason = reason
+    }
+}
+
+class ExchangeFailedException(val event: ExchangeBankEvent, message: String? = "exchange failed") : Exception(message)
+
+class ExchangeStartEvent() : ExchangeBankEvent() {
+    var to: UUID? = null
+    var from: UUID? = null
+    var amount: Long? = null
+
+    constructor(id: UUID, to: BankAccount, from: BankAccount, amount: Long) : this() {
+        this.id = id
+        this.to = to.id
+        this.from = from.id
+        this.amount = amount
+    }
+}
+
+class BankPauseOnRecalcEvent() : AbstractBankEvent() {
+
+    var emitterEvent: EmitterStartRecalculationEvent? = null
+
+    var paused: Boolean? = true
+
+    constructor(id: UUID, emitterEvent: EmitterStartRecalculationEvent, paused: Boolean = true) : this() {
+        this.id = id
+        this.emitterEvent = emitterEvent
+        this.paused = paused
+    }
+}

+ 3 - 3
src/main/kotlin/inn/ocsf/bee/freigeld/core/data/GlobalEmitterImpl.kt

@@ -98,11 +98,11 @@ class GlobalEmitterImpl : GlobalEmitter {
         }.id
     }
 
-    override fun emit(count: Long, value: CoinValue) {
+    override fun emit(count: Long, value: CoinValue, era: Int) {
         (0 until count).forEach {
-            computeIfAbsent { coinRepo.create(value, 0) }
+            computeIfAbsent { coinRepo.create(value, era) }
         }
-        log.debug("emitter emit ${count} of ${value}")
+        log.debug("emitter emit ${count} of ${value}-${era}")
     }
 
     override fun getOverall(): Long {

+ 14 - 389
src/main/kotlin/inn/ocsf/bee/freigeld/core/demo/DemoInMem.kt

@@ -1,21 +1,15 @@
 package inn.ocsf.bee.freigeld.core.demo
 
-import com.oblac.nomen.Nomen
-import inn.ocsf.bee.freigeld.core.calc.CoinStrategy
 import inn.ocsf.bee.freigeld.core.calc.CoinUtils
-import inn.ocsf.bee.freigeld.core.data.CentralBankAccountLevel
-import inn.ocsf.bee.freigeld.core.model.*
-import inn.ocsf.bee.freigeld.core.model.data.PersonData
-import kotlinx.coroutines.*
+import inn.ocsf.bee.freigeld.core.model.BankAccount
+import inn.ocsf.bee.freigeld.core.model.CentralBank
+import inn.ocsf.bee.freigeld.core.model.GlobalEmitter
+import inn.ocsf.bee.freigeld.core.model.GlobalWorld
 import org.slf4j.LoggerFactory
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.scheduling.annotation.Scheduled
 import org.springframework.stereotype.Service
-import java.util.*
-import java.util.concurrent.CompletableFuture
 import javax.annotation.PostConstruct
-import kotlin.math.absoluteValue
-import kotlin.math.roundToLong
 
 @Service
 class DemoInMem {
@@ -43,7 +37,11 @@ class DemoInMem {
         emitter.emit(25, CoinValue.bi)
         emitter.emit(10, CoinValue.mega)
 */
-        bank.exchange(bank.getSelfAccount(), emitter, 1_300_000)
+        CoinUtils.avaiues.flatMap { it.value }.forEach { c ->
+            emitter.emit((10..40L).random(), c.first, c.second)
+        }
+        bank.exchange(bank.getSelfAccount(), emitter, 2_300_000)
+        /*
         (0 until 100).map {
             val newHuman: NaturalPerson = PersonData.NaturalPersonImpl(Nomen.randomName())
             val oldHuman = world.getPersonByIdentity(PersonIdentityFullName(newHuman.fullName))
@@ -61,12 +59,13 @@ class DemoInMem {
                     .thenRun({ log.info("exchange ok") })
                     .exceptionally({ log.error("exchange error"); null })
         }
+         */
     }
 
     private fun doRand0() {
         val personIds = world.getPersonIds()
-        val fromPerson = personIds.random()
-        val toPerson = personIds.subtract(setOf(fromPerson)).random()
+        val fromPerson = personIds.subtract(setOf(bank.id)).random()
+        val toPerson = personIds.subtract(setOf(bank.id, fromPerson)).random()
         val fromAccount = bank.getAccounts(fromPerson).first()
         val toAccout = bank.getAccounts(toPerson).first()
         val dir = if (fromAccount.overall >= toAccout.overall) fromAccount to toAccout else toAccout to fromAccount
@@ -74,9 +73,9 @@ class DemoInMem {
         bank.exchange(dir.second, dir.first, x).thenRun({ log.info("exchange ok") }).exceptionally({ log.error("exchange error ${it.message}"); null })
     }
 
-    @Scheduled(initialDelay = 5 * 1000L, fixedDelay = 500L)
+    @Scheduled(initialDelay = 30 * 1000L, fixedDelay = 2000L)
     fun tick() {
-        doRand0()
+        //doRand0()
         log.info("tick")
     }
 
@@ -91,378 +90,4 @@ fun CentralBank.getSelfAccount(): BankAccount {
     return this.getAccounts(this.id).first()
 }
 
-@Service
-class GlobalBankDemo : CentralBankAccountLevel() {
-
-    @Autowired
-    private lateinit var emitter: GlobalEmitter
-
-    //val accountMap = mutableMapOf<UUID, BankAccount>()
-    //val personToAccountTable = mutableSetOf<Pair<UUID, UUID>>()
-    val globalQueue: Deque<BankEvent> = ArrayDeque()
-    val globalFutureMap = mutableMapOf<UUID, CompletableDeferred<Any>>()
-    val coinToAccountMap = mutableMapOf<UUID, UUID>()
-
-    private val log = LoggerFactory.getLogger(javaClass)
-
-    override fun exchange(to: BankAccount, from: BankAccount, amount: Long): CompletableFuture<Any> {
-        val e = ExchangeStartEvent(UUID.randomUUID(), to, from, amount)
-        val ret = CompletableFuture<Any>()
-        globalFutureMap.computeIfAbsent(e.id) { id ->
-            val def = CompletableDeferred<Any>()
-            def.invokeOnCompletion { t: Throwable? -> if (t == null) ret.complete(null) else ret.completeExceptionally(t) }
-            def
-        }
-        globalQueue.offerLast(e)
-        return ret
-    }
-
-    suspend fun pollLater(e: BankEvent): Boolean {
-        return when (e) {
-            is ExchangeStartEvent -> {
-                CoinUtils.makeSomeStrategy(CoinStrategy(e.amount, if (e.from.id != emitter.id) e.from.coins else null, if (e.from.id != selfId) e.to.coins else null)).let { st ->
-                    if (st.res!!.isOk()) {
-                        st
-                    } else if (st.res!!.isErr()) {
-                        globalQueue.offerLast(ExchangeFailedEvent(UUID.randomUUID(), e, "strategy error ${st.res}"))
-                        null
-                    } else {
-                        TODO()
-                    }
-                }?.let { st ->
-                    log.info("${selfId} do ${st.res?.name} transaction: ${e.from} -> ${e.to} = ${e.amount}")
-                    var fromAmount = e.from.overall
-                    var toAmount = e.to.overall
-
-                    val leftExtracted = if (st.leftExtract != null) {
-                        val old = e.from.extract(st.leftExtract?.map { it.id }!!)
-                        //emitter.accept(old) later
-                        old
-                    } else {
-                        listOf()
-                    }
-
-                    if (st.leftEmit != null) {
-                        val newIds = emitter.emit(st.leftEmit!!)
-                        val newCoins = emitter.extract(newIds)
-                        e.from.accept(newCoins)
-                        if (e.from.id != selfId) {
-                            emitter.free(newCoins)
-                        } else {
-                            //do nothing
-                        }
-                    }
-
-                    val rightExtracted = if (st.rightExtract != null) {
-                        val old = e.to.extract(st.rightExtract?.map { it.id }!!)
-                        //emitter.accept(old) later
-                        old
-                    } else {
-                        listOf()
-                    }
-
-                    if (st.rightEmit != null) {
-                        val newIds = emitter.emit(st.rightEmit!!)
-                        if (e.from.id == emitter.id) {
-                            fromAmount = e.from.overall
-                        }
-                        val newCoins = emitter.extract(newIds)
-                        e.to.accept(newCoins)
-                        if (e.to.id != selfId) {
-                            emitter.free(newCoins)
-                        } else {
-                            //do nothing
-                        }
-                    }
-
-                    if (st.rightCashback != null) {
-                        val localCoins = leftExtracted.map { it.id to it }.toMap().toMutableMap()
-                        val externalCoins = emitter.extract(st.rightCashback?.filterNot { localCoins.containsKey(it.id) }?.map { it.id }!!)
-                        val sameCoins = st.rightCashback?.filter { localCoins.containsKey(it.id) } ?: listOf()
-                        val someCoins = sameCoins + externalCoins
-                        e.to.accept(someCoins)
-                        someCoins.forEach { localCoins.remove(it.id) }
-                        emitter.accept(localCoins.values)
-                        if (e.to.id != selfId) {
-                            emitter.free(someCoins)
-                        } else {
-                            //do nothing
-                        }
-                    }
-
-                    if (st.leftCashback != null) {
-                        val localCoins = rightExtracted.map { it.id to it }.toMap().toMutableMap()
-                        val externalCoins = emitter.extract(st.leftCashback?.filterNot { localCoins.containsKey(it.id) }?.map { it.id }!!)
-                        val sameCoins = st.leftCashback?.filter { localCoins.containsKey(it.id) } ?: listOf()
-                        val someCoins = sameCoins + externalCoins
-                        e.from.accept(someCoins)
-                        someCoins.forEach { localCoins.remove(it.id) }
-                        emitter.accept(localCoins.values)
-                        if (e.from.id != selfId) {
-                            emitter.free(someCoins)
-                        } else {
-                            //do nothing
-                        }
-                    }
-
-                    val creditOk = (fromAmount - e.amount == e.from.overall)
-                    val debitOk = (toAmount + e.amount == e.to.overall)
-                    val fromCoins = e.from.coins
-                    val toCoins = e.to.coins
-                    val uniqOk = if (fromCoins != null) {
-                        if (toCoins != null) {
-                            fromCoins.map { it.id }.toSet().intersect(toCoins.map { it.id }.toSet()).isEmpty()
-                        } else {
-                            true
-                        }
-                    } else {
-                        true
-                    }
-                    if (debitOk && creditOk && uniqOk) {
-                        true
-                    } else {
-                        TODO()
-                        false
-                    }
-                }?.let { ok ->
-                    if (ok) {
-                        globalQueue.offerLast(ExchangeSuccessEvent(UUID.randomUUID(), e))
-                    } else {
-                        globalQueue.offerLast(ExchangeFailedEvent(UUID.randomUUID(), e, "exchange strategy failed"))
-                    }
-                    ok
-                } ?: true
-            }
-            is ExchangeSuccessEvent -> {
-                globalFutureMap.get(e.parentEvent.id)!!.let {
-                    globalFutureMap.remove(e.parentEvent.id)
-                    it.complete(true)
-                }
-                true
-            }
-            is ExchangeFailedEvent -> {
-                globalFutureMap.get(e.parentEvent.id)?.let {
-                    globalFutureMap.remove(e.parentEvent.id)
-                    it.completeExceptionally(ExchangeFailedException(e, e.reason))
-                }
-                true
-            }
-            is BankPauseOnRecalcEvent -> !e.paused
-            else -> throw IllegalArgumentException("wrong event type ${e.javaClass.name}")
-        }
-    }
-
-    fun poll(e: BankEvent): Boolean {
-        return runBlocking {
-            try {
-                pollLater(e)
-            } catch (t: Throwable) {
-                log.error("error in event", t)
-                true
-            }
-        }
-    }
-
-    val maxPollCount = 1024
-
-    @PostConstruct
-    fun start() {
-        GlobalScope.launch {
-            do {
-                tick()
-                delay(200)
-            } while (true)
-        }
-        emitter.listen { e ->
-            log.info("emitter event ${e.javaClass.name}")
-            when (e) {
-                is EmitterStartRecalculationEvent -> {
-                    globalQueue.offerLast(BankPauseOnRecalcEvent(UUID.randomUUID(), e))
-                    runBlocking {
-                        do {
-                            delay(500)
-                        } while (!(globalQueue.peekFirst() is BankPauseOnRecalcEvent))
-                    }
-                }
-                is EmitterStopRecalculationEvent -> {
-                    accounts.map { it to it.coins?.map { it.id }?.intersect(e.nullCoins) }.filter { it.second?.isNotEmpty() ?: false }.forEach { ap ->
-                        val nullCoins = ap.second?.map {
-                            //(ap.first as DemoAccount).internalCoins.get(it)?.incEra() //TODO убрать!
-                            ap.first.extractOne(it)
-                        }?.toSet() ?: setOf()
-                        emitter.accept(nullCoins)
-                    }
-                    globalQueue.filter { it is BankPauseOnRecalcEvent }.map { it as BankPauseOnRecalcEvent }.first().paused = false
-                }
-            }
-        }
-    }
-
-    fun tick() {
-        var pollCount = 0
-        while (globalQueue.isNotEmpty() && pollCount < maxPollCount) {
-            val e = globalQueue.element()
-            if (poll(e)) {
-                globalQueue.remove(e)
-            }
-            pollCount++
-        }
-    }
-
-    class DemoOwnerAccout(val inner: BankAccount, val owner: GlobalBankDemo) : BankAccount {
-
-        override fun getOverall(): Long {
-            return inner.overall
-        }
-
-        override fun getCoins(): MutableCollection<Coin>? {
-            return inner.coins
-        }
-
-        override fun getOwnerId(): UUID {
-            TODO("Not yet implemented")
-        }
-
-        override fun acceptOne(coin: Coin) {
-            if (owner.coinToAccountMap.containsKey(coin.id)) {
-                throw RuntimeException("already owned")
-            } else {
-                owner.coinToAccountMap.put(coin.id, this.id)
-            }
-            inner.acceptOne(coin)
-        }
-
-        override fun getId(): UUID {
-            return inner.id
-        }
-
-        override fun extractOne(coinId: UUID): Coin? {
-            if (owner.coinToAccountMap.remove(coinId) == null) {
-                throw RuntimeException("owner missing")
-            }
-            return inner.extractOne(coinId)
-        }
-
-        override fun toString(): String {
-            return "*${inner}"
-        }
-
-    }
-}
-
-abstract class AbstractBankEvent(val eventId: UUID) : BankEvent {
-
-    override fun getId(): UUID {
-        return eventId
-    }
-
-    override fun equals(other: Any?): Boolean {
-        if (this === other) return true
-        if (javaClass != other?.javaClass) return false
-
-        other as AbstractBankEvent
-
-        if (eventId != other.eventId) return false
-
-        return true
-    }
-
-    override fun hashCode(): Int {
-        return eventId.hashCode()
-    }
-
-}
-
-abstract class ExchangeBankEvent(eventId: UUID) : AbstractBankEvent(eventId)
-
-class ExchangeSuccessEvent(eventId: UUID, val parentEvent: ExchangeStartEvent) : ExchangeBankEvent(eventId)
-
-class ExchangeFailedEvent(eventId: UUID, val parentEvent: ExchangeStartEvent, val reason: String) : ExchangeBankEvent(eventId)
-
-class ExchangeFailedException(val event: ExchangeBankEvent, message: String = "exchange failed") : Exception(message)
-
-class ExchangeStartEvent(eventId: UUID, val to: BankAccount, val from: BankAccount, val amount: Long, var retry: Boolean = false) : ExchangeBankEvent(eventId)
-
-class BankPauseOnRecalcEvent(eventId: UUID, val emitterEvent: EmitterStartRecalculationEvent, var paused: Boolean = true) : AbstractBankEvent(eventId)
-
-
-open class DemoAccount(val accountId: UUID, val personId: UUID) : BankAccount {
-
-    val internalCoins = mutableMapOf<UUID, Coin>()
-    var _overall: Long? = null
-
-    private val log = LoggerFactory.getLogger(javaClass)
-
-    override fun getId(): UUID {
-        return accountId
-    }
-
-    override fun extractOne(coinId: UUID): Coin {
-        return if (internalCoins.containsKey(coinId)) {
-            val coin = internalCoins.remove(coinId)!!
-            if (coin.current == 0L) {
-                _overall = null
-                log.debug("${this} leave coin ${coin.current}")
-            } else {
-                decOverall(coin.current)
-                log.debug("${this} credit coin ${coin.current}")
-            }
-            coin
-        } else {
-            throw IllegalArgumentException("no such coin")
-        }
-    }
-
-    override fun getOverall(): Long {
-        return if (_overall != null) {
-            _overall
-        } else {
-            _overall = internalCoins.values.map { it.current }.onEach { if (it == 0L) throw RuntimeException("illegal value 0") }.sum()
-            _overall
-        } ?: 0L
-    }
-
-    override fun getCoins(): MutableCollection<Coin> {
-        return internalCoins.values
-    }
-
-    override fun getOwnerId(): UUID {
-        return personId
-    }
-
-    private fun decOverall(amount: Long) {
-        if (amount == 0L) throw IllegalArgumentException("should not be 0")
-        if (_overall != null) {
-            _overall = _overall!! - amount
-        } else {
-            _overall = -amount
-        }
-    }
-
-    private fun incOverall(amount: Long) {
-        if (amount == 0L) throw IllegalArgumentException("should not be 0")
-        if (_overall != null) {
-            _overall = _overall!! + amount
-        } else {
-            _overall = amount
-        }
-    }
-
-    override fun acceptOne(coin: Coin) {
-        if (coin.id != null && !internalCoins.containsKey(coin.id)) {
-            internalCoins.put(coin.id, coin)
-            incOverall(coin.current)
-        } else if (coin.id == null) {
-            throw IllegalArgumentException("fake coin is not acceptable")
-        } else if (internalCoins.containsKey(coin.id)) {
-            throw IllegalArgumentException("same coin is not acceptable")
-        }
-        log.debug("${this} debit coin ${CoinUtils.sum(listOf(coin))}")
-    }
-
-
-    override fun toString(): String {
-        return accountId.hashCode().absoluteValue.toString(16)
-    }
-}
 

+ 49 - 0
src/main/kotlin/inn/ocsf/bee/freigeld/core/service/TicketService.kt

@@ -0,0 +1,49 @@
+package inn.ocsf.bee.freigeld.core.service
+
+import inn.ocsf.bee.freigeld.core.model.Ticket
+import inn.ocsf.bee.freigeld.core.model.TicketStatus
+import inn.ocsf.bee.freigeld.core.model.data.TicketData
+import inn.ocsf.bee.freigeld.core.repo.TicketRepository
+import org.springframework.stereotype.Service
+import java.util.*
+import javax.annotation.PostConstruct
+import javax.inject.Inject
+
+@Service
+class TicketService {
+
+    @Inject
+    private lateinit var ticketRepo: TicketRepository
+
+    @PostConstruct
+    fun init() {
+        ticketRepo.deleteAll()
+    }
+
+    fun remove(e: Ticket) {
+        val td = ticketRepo.findOneByTicketId(e.id).orElseThrow { RuntimeException("event ${e.id} not found") }
+        td.status = TicketStatus.completed
+        ticketRepo.save(td)
+    }
+
+    fun offerLast(topic: String, e: Ticket) {
+        val td = TicketData()
+        td.status = TicketStatus.created
+        td.ts = Date()
+        td.topic = topic
+        td.ticket = e
+        ticketRepo.save(td)
+    }
+
+    fun peekFirst(topic: String, modifyFn: ((Ticket) -> Ticket)? = null): Ticket? {
+        return ticketRepo.findFirstByTopicAndStatusInOrderByIdAsc(topic, listOf(TicketStatus.created)).orElse(null)?.let {
+            if (modifyFn != null) {
+                it.ticket = modifyFn(it.ticket)
+                ticketRepo.save(it)
+                it.ticket
+            } else {
+                it.ticket
+            }
+        }
+    }
+}

+ 82 - 0
src/test/kotlin/inn/ocsf/bee/freigeld/core/demo/CoinUtilsTest.kt

@@ -1,15 +1,97 @@
 package inn.ocsf.bee.freigeld.core.demo
 
 import inn.ocsf.bee.freigeld.core.calc.CoinUtils
+import inn.ocsf.bee.freigeld.core.model.BankAccount
 import inn.ocsf.bee.freigeld.core.model.Coin
 import inn.ocsf.bee.freigeld.core.model.CoinValue
 import org.junit.jupiter.api.RepeatedTest
 import org.junit.jupiter.api.Test
 import org.slf4j.LoggerFactory
 import java.util.*
+import kotlin.math.absoluteValue
 import kotlin.test.assertEquals
 import kotlin.test.assertFalse
 
+open class DemoAccount(val accountId: UUID, val personId: UUID) : BankAccount {
+
+    val internalCoins = mutableMapOf<UUID, Coin>()
+    var _overall: Long? = null
+
+    private val log = LoggerFactory.getLogger(javaClass)
+
+    override fun getId(): UUID {
+        return accountId
+    }
+
+    override fun extractOne(coinId: UUID): Coin {
+        return if (internalCoins.containsKey(coinId)) {
+            val coin = internalCoins.remove(coinId)!!
+            if (coin.current == 0L) {
+                _overall = null
+                log.debug("${this} leave coin ${coin.current}")
+            } else {
+                decOverall(coin.current)
+                log.debug("${this} credit coin ${coin.current}")
+            }
+            coin
+        } else {
+            throw IllegalArgumentException("no such coin")
+        }
+    }
+
+    override fun getOverall(): Long {
+        return if (_overall != null) {
+            _overall
+        } else {
+            _overall = internalCoins.values.map { it.current }.onEach { if (it == 0L) throw RuntimeException("illegal value 0") }.sum()
+            _overall
+        } ?: 0L
+    }
+
+    override fun getCoins(): MutableCollection<Coin> {
+        return internalCoins.values
+    }
+
+    override fun getOwnerId(): UUID {
+        return personId
+    }
+
+    private fun decOverall(amount: Long) {
+        if (amount == 0L) throw IllegalArgumentException("should not be 0")
+        if (_overall != null) {
+            _overall = _overall!! - amount
+        } else {
+            _overall = -amount
+        }
+    }
+
+    private fun incOverall(amount: Long) {
+        if (amount == 0L) throw IllegalArgumentException("should not be 0")
+        if (_overall != null) {
+            _overall = _overall!! + amount
+        } else {
+            _overall = amount
+        }
+    }
+
+    override fun acceptOne(coin: Coin) {
+        if (coin.id != null && !internalCoins.containsKey(coin.id)) {
+            internalCoins.put(coin.id, coin)
+            incOverall(coin.current)
+        } else if (coin.id == null) {
+            throw IllegalArgumentException("fake coin is not acceptable")
+        } else if (internalCoins.containsKey(coin.id)) {
+            throw IllegalArgumentException("same coin is not acceptable")
+        }
+        log.debug("${this} debit coin ${CoinUtils.sum(listOf(coin))}")
+    }
+
+
+    override fun toString(): String {
+        return accountId.hashCode().absoluteValue.toString(16)
+    }
+}
+
 private class TestCoin(val coinId: UUID, val initialValue: CoinValue, var _era: Int = 0) : Coin {
 
     override fun getId(): UUID = coinId