Forráskód Böngészése

полный переезд в монгу

kpmy 5 éve
szülő
commit
8a48934bf4

+ 11 - 0
db/config.md

@@ -0,0 +1,11 @@
+docker run --name mongodb -p 27017:27017 -v /var/docker/mongodb/data:/data/db -d mongo:4.2 mongod --replSet frei0
+
+rs.initiate()
+
+rs.conf()
+
+cfg = rs.conf()
+
+cfg.members[0].host = "roof:27017"
+
+ rs.reconfig(cfg)

+ 14 - 0
pom.xml

@@ -49,6 +49,12 @@
             <version>${spring.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
+            <version>${spring.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>com.querydsl</groupId>
             <artifactId>querydsl-mongodb</artifactId>
@@ -61,6 +67,7 @@
             <version>${querydsl.version}</version>
         </dependency>
 
+
         <dependency>
             <groupId>org.jetbrains.kotlin</groupId>
             <artifactId>kotlin-stdlib-jdk8</artifactId>
@@ -131,6 +138,13 @@
             <version>1.8.0</version>
         </dependency>
 
+        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.10</version>
+        </dependency>
+
     </dependencies>
 
     <build>

+ 1 - 1
src/main/java/inn/ocsf/bee/freigeld/core/model/Bank.java

@@ -20,7 +20,7 @@ public interface Bank extends PrivatePerson {
     Collection<BankAccount> getAccounts(@NotNull UUID person);
 
     @NotNull
-    CompletableFuture<Object> exchange(@NotNull BankAccount to, BankAccount from, @NotNull Long amount);
+    CompletableFuture<Ticket> exchange(@NotNull BankAccount to, @NotNull BankAccount from, @NotNull Long amount);
 
     @NotNull
     Stream<BankAccount> getAccounts();

+ 1 - 1
src/main/java/inn/ocsf/bee/freigeld/core/model/Emitter.java

@@ -32,6 +32,6 @@ public interface Emitter {
 
     void listen(@NotNull Consumer<EmitterEvent> fn);
 
-    CompletableFuture<Object> calc();
+    CompletableFuture<EmitterEvent> calc();
 
 }

+ 26 - 0
src/main/java/inn/ocsf/bee/freigeld/core/model/data/StateObject.java

@@ -0,0 +1,26 @@
+package inn.ocsf.bee.freigeld.core.model.data;
+
+import org.springframework.data.annotation.Id;
+import org.springframework.data.annotation.Version;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+import java.util.UUID;
+
+@Document("state")
+public abstract class StateObject {
+
+    @Id
+    private UUID id;
+
+    @Version
+    private Long version;
+
+    public UUID getId() {
+        return id;
+    }
+
+    public void setId(UUID id) {
+        this.id = id;
+    }
+
+}

+ 8 - 0
src/main/java/inn/ocsf/bee/freigeld/core/repo/ReactiveTicketRepository.java

@@ -0,0 +1,8 @@
+package inn.ocsf.bee.freigeld.core.repo;
+
+import inn.ocsf.bee.freigeld.core.model.data.TicketData;
+import org.bson.types.ObjectId;
+import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
+
+public interface ReactiveTicketRepository extends ReactiveMongoRepository<TicketData, ObjectId> {
+}

+ 9 - 0
src/main/java/inn/ocsf/bee/freigeld/core/repo/StateRepository.java

@@ -0,0 +1,9 @@
+package inn.ocsf.bee.freigeld.core.repo;
+
+import inn.ocsf.bee.freigeld.core.model.data.StateObject;
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+import java.util.UUID;
+
+public interface StateRepository extends MongoRepository<StateObject, UUID> {
+}

+ 13 - 1
src/main/kotlin/inn/ocsf/bee/freigeld/core/FreiApp.kt

@@ -1,14 +1,17 @@
 package inn.ocsf.bee.freigeld.core
 
+import inn.ocsf.bee.freigeld.core.demo.DemoInMem
 import org.springframework.boot.SpringApplication
 import org.springframework.boot.autoconfigure.SpringBootApplication
 import org.springframework.context.annotation.Profile
 import org.springframework.context.annotation.PropertySource
 import org.springframework.scheduling.annotation.EnableScheduling
 import org.springframework.stereotype.Service
+import org.springframework.web.bind.annotation.RequestMapping
 import org.springframework.web.bind.annotation.RestController
 import sun.misc.Unsafe
 import javax.annotation.PostConstruct
+import javax.inject.Inject
 
 @SpringBootApplication
 @EnableScheduling
@@ -22,7 +25,16 @@ class FreiApp{
 }
 
 @RestController
-class DevController
+class DevController {
+
+    @Inject
+    private lateinit var demo: DemoInMem
+
+    @RequestMapping("dev/doEx0")
+    fun doEx0() {
+        demo.tick()
+    }
+}
 
 @Service
 @Profile("dev")

+ 38 - 3
src/main/kotlin/inn/ocsf/bee/freigeld/core/data/CentralBankAccountLevel.kt

@@ -9,10 +9,13 @@ import inn.ocsf.bee.freigeld.core.model.data.BankAccountData
 import inn.ocsf.bee.freigeld.core.model.data.CoinAccountLink
 import inn.ocsf.bee.freigeld.core.model.data.PersonData
 import inn.ocsf.bee.freigeld.core.model.data.QBankAccountData.bankAccountData
+import inn.ocsf.bee.freigeld.core.model.data.StateObject
 import inn.ocsf.bee.freigeld.core.repo.BankAccountRepository
 import inn.ocsf.bee.freigeld.core.repo.CoinAccountLinkRepository
 import inn.ocsf.bee.freigeld.core.repo.CoinRepository
+import inn.ocsf.bee.freigeld.core.repo.StateRepository
 import org.slf4j.LoggerFactory
+import org.springframework.transaction.annotation.Transactional
 import java.util.*
 import java.util.stream.Stream
 import java.util.stream.StreamSupport
@@ -33,6 +36,9 @@ abstract class CentralBankAccountLevel : CentralBank {
     @Inject
     private lateinit var linkRepo: CoinAccountLinkRepository
 
+    @Inject
+    private lateinit var stateRepo: StateRepository
+
     protected val bankId = UUID.fromString("a8486777-834c-4267-9f10-69f03714c523")
 
     private val log = LoggerFactory.getLogger(javaClass)
@@ -66,15 +72,31 @@ abstract class CentralBankAccountLevel : CentralBank {
         return WrappedBankAccount(account as BankAccountData.BankAccountDefault, delegate)
     }
 
+    @Transactional
+    open fun <T> getCurrentState(fn: (CentralBankStateObject) -> T? = { null }): T? {
+        return stateRepo.findById(bankId).orElseGet { stateRepo.save(CentralBankStateObject(bankId)) }.let {
+            try {
+                val ret = fn(it as CentralBankStateObject)
+                stateRepo.save(it)
+                ret
+            } catch (e: Exception) {
+                log.error("error while access state", e)
+                null
+            }
+        }
+    }
+
     @PostConstruct
-    fun init() {
-        accountRepo.deleteAll()
-        linkRepo.deleteAll()
+    @Transactional
+    open fun init() {
+        //accountRepo.deleteAll()
+        //linkRepo.deleteAll()
         val thisBank = world.getPerson(bankId) ?: world.addPerson(PersonData.BankPrivate(bankId))
         val thisAccounts = getAccounts(bankId)
         if (thisAccounts.isEmpty()) {
             addAccount(bankId)
         }
+        getCurrentState { }
     }
 
     override fun addAccount(personId: UUID): BankAccount {
@@ -178,3 +200,16 @@ private class WrappedBankAccount(val inner: BankAccountData.BankAccountDefault,
         return inner.toString()
     }
 }
+
+enum class BankQueueState {
+    open, recalc, closed;
+}
+
+class CentralBankStateObject() : StateObject() {
+
+    var queueState: BankQueueState? = BankQueueState.open
+
+    constructor(id: UUID) : this() {
+        this.id = id
+    }
+}

+ 55 - 67
src/main/kotlin/inn/ocsf/bee/freigeld/core/data/CentralBankQueueLevel.kt

@@ -4,10 +4,11 @@ import inn.ocsf.bee.freigeld.core.calc.CoinStrategy
 import inn.ocsf.bee.freigeld.core.calc.CoinUtils
 import inn.ocsf.bee.freigeld.core.model.*
 import inn.ocsf.bee.freigeld.core.service.TicketService
-import kotlinx.coroutines.*
+import kotlinx.coroutines.CompletableDeferred
 import org.slf4j.LoggerFactory
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.stereotype.Service
+import org.springframework.transaction.annotation.Transactional
 import java.util.*
 import java.util.concurrent.CompletableFuture
 import javax.annotation.PostConstruct
@@ -26,23 +27,24 @@ class CentralBankQueueLevel : CentralBankAccountLevel() {
     @Inject
     private lateinit var ticketService: TicketService
 
-    val globalFutureMap = mutableMapOf<UUID, CompletableDeferred<Any>>()
+    private val globalFutureMap = mutableMapOf<UUID, CompletableDeferred<Ticket>>()
 
     private val log = LoggerFactory.getLogger(javaClass)
 
-    override fun exchange(to: BankAccount, from: BankAccount, amount: Long): CompletableFuture<Any> {
+    override fun exchange(to: BankAccount, from: BankAccount, amount: Long): CompletableFuture<Ticket> {
         val e = ExchangeStartEvent(UUID.randomUUID(), to, from, amount)
-        val ret = CompletableFuture<Any>()
+        val ret = CompletableFuture<Ticket>()
         globalFutureMap.computeIfAbsent(e.id) { id ->
-            val def = CompletableDeferred<Any>()
-            def.invokeOnCompletion { t: Throwable? -> if (t == null) ret.complete(null) else ret.completeExceptionally(t) }
+            ticketService.offerLast(bankTicketChannelName, e)
+            val def = CompletableDeferred<Ticket>()
+            def.invokeOnCompletion { t: Throwable? -> if (t == null) ret.complete(e) else ret.completeExceptionally(t) }
             def
         }
-        ticketService.offerLast(bankTicketChannelName, e)
         return ret
     }
 
-    suspend fun pollLater(e: BankEvent): Boolean {
+    @Transactional
+    fun pollLater(e: BankEvent): Boolean {
         return when (e) {
             is ExchangeStartEvent -> {
                 val eFrom = e.from?.let { getAccount(it) } ?: emitter
@@ -163,83 +165,62 @@ class CentralBankQueueLevel : CentralBankAccountLevel() {
                 } ?: true
             }
             is ExchangeSuccessEvent -> {
-                globalFutureMap.get(e.parentEvent?.id)!!.let {
-                    globalFutureMap.remove(e.parentEvent?.id)
-                    it.complete(true)
+                e.parentEvent?.let { p ->
+                    globalFutureMap.get(p.id)?.let {
+                        globalFutureMap.remove(p.id)
+                        it.complete(p)
+                    }
                 }
                 true
             }
             is ExchangeFailedEvent -> {
-                globalFutureMap.get(e.parentEvent?.id)?.let {
-                    globalFutureMap.remove(e.parentEvent?.id)
-                    it.completeExceptionally(ExchangeFailedException(e, e.reason))
+                e.parentEvent?.let { p ->
+                    globalFutureMap.get(p.id)?.let {
+                        globalFutureMap.remove(e.parentEvent?.id)
+                        it.completeExceptionally(ExchangeFailedException(e, e.reason))
+                    }
                 }
                 true
             }
-            is BankPauseOnRecalcEvent -> !e.paused!!
-            else -> throw IllegalArgumentException("wrong event type ${e.javaClass.name}")
-        }
-    }
-
-    fun poll(e: BankEvent): Boolean {
-        return runBlocking {
-            try {
-                pollLater(e)
-            } catch (t: Throwable) {
-                log.error("error in event", t)
+            is BankPauseOnRecalcEvent -> {
+                getCurrentState { it.queueState = BankQueueState.recalc }
                 true
             }
+            is BankResumeAfterRecalcEvent -> {
+                accounts.map { it to it.coins?.map { it.id }?.intersect(e.emitterEvent?.nullCoins ?: setOf()) }.filter { it.second?.isNotEmpty() ?: false }.forEach { ap ->
+                    val nullCoins = ap.second?.map { ap.first.extractOne(it) }?.toSet() ?: setOf()
+                    emitter.accept(nullCoins)
+                }
+                getCurrentState { it.queueState = BankQueueState.open }
+                true
+            }
+            else -> throw IllegalArgumentException("wrong event type ${e.javaClass.name}")
         }
     }
 
-    val maxPollCount = 1024
-
     @PostConstruct
     fun start() {
-        GlobalScope.launch {
-            do {
-                tick()
-                delay(200)
-            } while (true)
-        }
         emitter.listen { e ->
-            log.info("emitter event ${e.javaClass.name}")
+            log.debug("emitter event ${e.javaClass.name}")
             when (e) {
-                is EmitterStartRecalculationEvent -> {
-                    ticketService.offerLast(bankTicketChannelName, BankPauseOnRecalcEvent(UUID.randomUUID(), e))
-                    runBlocking {
-                        do {
-                            delay(500)
-                        } while (!(ticketService.peekFirst(bankTicketChannelName) is BankPauseOnRecalcEvent))
-                    }
-                }
-                is EmitterStopRecalculationEvent -> {
-                    accounts.map { it to it.coins?.map { it.id }?.intersect(e.nullCoins) }.filter { it.second?.isNotEmpty() ?: false }.forEach { ap ->
-                        val nullCoins = ap.second?.map {
-                            //(ap.first as DemoAccount).internalCoins.get(it)?.incEra() //TODO убрать!
-                            ap.first.extractOne(it)
-                        }?.toSet() ?: setOf()
-                        emitter.accept(nullCoins)
-                    }
-                    ticketService.peekFirst(bankTicketChannelName) { (it as BankPauseOnRecalcEvent).paused = false; it }
-                }
+                is EmitterStartRecalculationEvent -> ticketService.offerLast(bankTicketChannelName, BankPauseOnRecalcEvent(UUID.randomUUID(), e))
+                is EmitterStopRecalculationEvent -> ticketService.offerLast(bankTicketChannelName, BankResumeAfterRecalcEvent(UUID.randomUUID(), e))
+                else -> throw RuntimeException("unknown emitter event ${e.javaClass.name}")
             }
         }
-    }
-
-    fun tick() {
-        var pollCount = 0
-        do {
-            val e = ticketService.peekFirst(bankTicketChannelName)
-            if (e != null && e is AbstractBankEvent) {
-                if (poll(e)) {
-                    ticketService.remove(e)
+        ticketService.listen(bankTicketChannelName) { e ->
+            val qst = getCurrentState { it.queueState ?: BankQueueState.open }
+            if (e is AbstractBankEvent && qst != BankQueueState.closed) {
+                if ((qst == BankQueueState.open && !(e is BankResumeAfterRecalcEvent)) || (qst == BankQueueState.recalc && e is BankResumeAfterRecalcEvent)) {
+                    pollLater(e)
+                } else {
+                    false
                 }
+            } else {
+                null
             }
-            pollCount++
-        } while (e != null && pollCount < maxPollCount)
+        }
     }
-
 }
 
 abstract class AbstractBankEvent : BankEvent()
@@ -287,11 +268,18 @@ class BankPauseOnRecalcEvent() : AbstractBankEvent() {
 
     var emitterEvent: EmitterStartRecalculationEvent? = null
 
-    var paused: Boolean? = true
+    constructor(id: UUID, emitterEvent: EmitterStartRecalculationEvent) : this() {
+        this.id = id
+        this.emitterEvent = emitterEvent
+    }
+}
+
+class BankResumeAfterRecalcEvent() : AbstractBankEvent() {
+    var emitterEvent: EmitterStopRecalculationEvent? = null
 
-    constructor(id: UUID, emitterEvent: EmitterStartRecalculationEvent, paused: Boolean = true) : this() {
+    constructor(id: UUID, emitterEvent: EmitterStopRecalculationEvent) : this() {
         this.id = id
         this.emitterEvent = emitterEvent
-        this.paused = paused
     }
 }
+

+ 15 - 12
src/main/kotlin/inn/ocsf/bee/freigeld/core/data/GlobalEmitterImpl.kt

@@ -12,6 +12,7 @@ import kotlinx.coroutines.GlobalScope
 import kotlinx.coroutines.launch
 import org.slf4j.LoggerFactory
 import org.springframework.stereotype.Service
+import org.springframework.transaction.annotation.Transactional
 import java.util.*
 import java.util.concurrent.CompletableFuture
 import java.util.concurrent.locks.ReentrantLock
@@ -28,19 +29,19 @@ class GlobalEmitterImpl : GlobalEmitter {
 
     private val globalId = UUID.fromString("a671cdca-782a-4caf-8aad-056f6b62d822")
 
-    val listeners = mutableListOf<Consumer<EmitterEvent>>()
+    private val listeners = mutableListOf<Consumer<EmitterEvent>>()
 
-    val deferredLock = ReentrantLock()
+    private val deferredLock = ReentrantLock()
 
     private val log = LoggerFactory.getLogger(javaClass)
 
     @PostConstruct
     fun init() {
-        coinRepo.deleteAll()
+        //coinRepo.deleteAll()
     }
 
     private fun computeIfAbsent(future: Coin? = null, fn: () -> CoinData): Coin {
-        var ret: Coin? = if (future != null) {
+        val ret: Coin? = if (future != null) {
             coinRepo.findAll(Expressions.allOf(coinData.status.eq(CoinStatus.indoor), coinData.coin.value.eq(future.value), coinData.coin.era.eq(future.era))).map { it.coin }.firstOrNull()
         } else {
             null
@@ -50,7 +51,7 @@ class GlobalEmitterImpl : GlobalEmitter {
             coin = coinRepo.save(coin)
             coin.coin
         } else {
-            log.info("emitter reuse coin ${CoinUtils.sum(listOf(ret))}")
+            log.debug("emitter reuse coin ${CoinUtils.sum(listOf(ret))}")
             ret
         }
     }
@@ -59,7 +60,7 @@ class GlobalEmitterImpl : GlobalEmitter {
         listeners.add(fn)
     }
 
-    private suspend fun calcLater(): Set<UUID> {
+    protected fun calcLater(): Set<UUID> {
         val ret = mutableSetOf<UUID>()
         coinRepo.findAllByStatus(CoinStatus.free).peek {
             it.coin.incEra()
@@ -71,17 +72,19 @@ class GlobalEmitterImpl : GlobalEmitter {
         return ret
     }
 
-    override fun calc(): CompletableFuture<Any> {
-        val ret = CompletableFuture<Any>()
+    @Transactional
+    override fun calc(): CompletableFuture<EmitterEvent> {
+        val ret = CompletableFuture<EmitterEvent>()
         listeners.forEach { it.accept(EmitterStartRecalculationEvent()) }
         deferredLock.withLock {
-            val deferred = CompletableDeferred<Boolean>()
-            deferred.invokeOnCompletion { t: Throwable? -> if (t == null) ret.complete(null) else ret.completeExceptionally(t) }
+            val deferred = CompletableDeferred<EmitterEvent>()
             GlobalScope.launch {
                 try {
                     val nullCoins = calcLater()
-                    deferred.complete(true)
-                    listeners.forEach { it.accept(EmitterStopRecalculationEvent(nullCoins)) }
+                    val ev = EmitterStopRecalculationEvent(nullCoins)
+                    deferred.invokeOnCompletion { t: Throwable? -> if (t == null) ret.complete(ev) else ret.completeExceptionally(t) }
+                    deferred.complete(ev)
+                    listeners.forEach { it.accept(ev) }
                 } catch (t: Throwable) {
                     deferred.completeExceptionally(t)
                 }

+ 1 - 1
src/main/kotlin/inn/ocsf/bee/freigeld/core/data/GlobalWorldImpl.kt

@@ -21,7 +21,7 @@ class GlobalWorldImpl : GlobalWorld {
 
     @PostConstruct
     fun init() {
-        personRepo.deleteAll()
+        //personRepo.deleteAll()
     }
 
     override fun addPerson(person: Person): UUID {

+ 6 - 7
src/main/kotlin/inn/ocsf/bee/freigeld/core/demo/DemoInMem.kt

@@ -1,6 +1,5 @@
 package inn.ocsf.bee.freigeld.core.demo
 
-import inn.ocsf.bee.freigeld.core.calc.CoinUtils
 import inn.ocsf.bee.freigeld.core.model.BankAccount
 import inn.ocsf.bee.freigeld.core.model.CentralBank
 import inn.ocsf.bee.freigeld.core.model.GlobalEmitter
@@ -37,11 +36,11 @@ class DemoInMem {
         emitter.emit(25, CoinValue.bi)
         emitter.emit(10, CoinValue.mega)
 */
-        CoinUtils.avaiues.flatMap { it.value }.forEach { c ->
+        /*CoinUtils.avaiues.flatMap { it.value }.forEach { c ->
             emitter.emit((10..40L).random(), c.first, c.second)
         }
-        bank.exchange(bank.getSelfAccount(), emitter, 2_300_000)
-        /*
+        //bank.exchange(bank.getSelfAccount(), emitter, 2_300_000)
+
         (0 until 100).map {
             val newHuman: NaturalPerson = PersonData.NaturalPersonImpl(Nomen.randomName())
             val oldHuman = world.getPersonByIdentity(PersonIdentityFullName(newHuman.fullName))
@@ -59,7 +58,7 @@ class DemoInMem {
                     .thenRun({ log.info("exchange ok") })
                     .exceptionally({ log.error("exchange error"); null })
         }
-         */
+*/
     }
 
     private fun doRand0() {
@@ -73,9 +72,9 @@ class DemoInMem {
         bank.exchange(dir.second, dir.first, x).thenRun({ log.info("exchange ok") }).exceptionally({ log.error("exchange error ${it.message}"); null })
     }
 
-    @Scheduled(initialDelay = 30 * 1000L, fixedDelay = 2000L)
+    //@Scheduled(initialDelay = 30 * 1000L, fixedDelay = 2000L)
     fun tick() {
-        //doRand0()
+        doRand0()
         log.info("tick")
     }
 

+ 47 - 2
src/main/kotlin/inn/ocsf/bee/freigeld/core/service/TicketService.kt

@@ -3,9 +3,16 @@ package inn.ocsf.bee.freigeld.core.service
 import inn.ocsf.bee.freigeld.core.model.Ticket
 import inn.ocsf.bee.freigeld.core.model.TicketStatus
 import inn.ocsf.bee.freigeld.core.model.data.TicketData
+import inn.ocsf.bee.freigeld.core.repo.ReactiveTicketRepository
 import inn.ocsf.bee.freigeld.core.repo.TicketRepository
+import kotlinx.coroutines.GlobalScope
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.launch
+import org.slf4j.LoggerFactory
 import org.springframework.stereotype.Service
 import java.util.*
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.CopyOnWriteArrayList
 import javax.annotation.PostConstruct
 import javax.inject.Inject
 
@@ -15,12 +22,45 @@ class TicketService {
     @Inject
     private lateinit var ticketRepo: TicketRepository
 
+    @Inject
+    private lateinit var ticketReact: ReactiveTicketRepository
+
+    private val listenerMap = ConcurrentHashMap<String, CopyOnWriteArrayList<(Ticket) -> Boolean?>>()
+
+    private val log = LoggerFactory.getLogger(javaClass)
+
+    private fun tick() {
+        listenerMap.keys().iterator().forEach { topic ->
+            peekFirst(topic)?.let { t ->
+                val ok: Boolean? = try {
+                    listenerMap.get(topic)?.map { fn ->
+                        try {
+                            fn(t)
+                        } catch (t: Throwable) {
+                            log.error("error in event", t)
+                            true
+                        }
+                    }?.firstOrNull { it ?: false }
+                } catch (t: Throwable) {
+                    log.error("error in event", t)
+                    true
+                }
+                if (ok == true) remove(t)
+            }
+        }
+    }
+
     @PostConstruct
     fun init() {
-        ticketRepo.deleteAll()
+        GlobalScope.launch {
+            do {
+                tick()
+                delay(100)
+            } while (true)
+        }
     }
 
-    fun remove(e: Ticket) {
+    private fun remove(e: Ticket) {
         val td = ticketRepo.findOneByTicketId(e.id).orElseThrow { RuntimeException("event ${e.id} not found") }
         td.status = TicketStatus.completed
         ticketRepo.save(td)
@@ -46,4 +86,9 @@ class TicketService {
             }
         }
     }
+
+    fun listen(topic: String, fn: (Ticket) -> Boolean?) {
+        listenerMap.computeIfAbsent(topic) { CopyOnWriteArrayList() }
+        listenerMap.get(topic)?.add(fn)
+    }
 }

+ 1 - 1
src/main/resources/application-dev.yaml

@@ -3,4 +3,4 @@ server:
 spring:
   data:
     mongodb:
-      uri: mongodb://roof:27017/frei
+      uri: mongodb://roof:27017/frei?replicaSet=frei0