|
@@ -14,6 +14,7 @@ import java.util.*
|
|
|
import java.util.concurrent.CompletableFuture
|
|
|
import java.util.function.Consumer
|
|
|
import javax.annotation.PostConstruct
|
|
|
+import kotlin.math.absoluteValue
|
|
|
import kotlin.math.min
|
|
|
import kotlin.math.roundToLong
|
|
|
|
|
@@ -58,10 +59,10 @@ class DemoInMem {
|
|
|
val toAccout = bank.getAccounts(toPerson).first()
|
|
|
val dir = if (fromAccount.overall >= toAccout.overall) fromAccount to toAccout else toAccout to fromAccount
|
|
|
val x = Math.min(Math.round(dir.first.overall * 0.9), Math.max(0, Math.round(dir.first.overall * Math.random())))
|
|
|
- bank.exchange(dir.second, dir.first, x)
|
|
|
+ bank.exchange(dir.second, dir.first, x).thenRun({ log.info("exchange ok") }).exceptionally({ log.error("exchange error ${it.message}"); null })
|
|
|
}
|
|
|
|
|
|
- @Scheduled(initialDelay = 5 * 1000L, fixedDelay = 5 * 1000L)
|
|
|
+ @Scheduled(initialDelay = 5 * 1000L, fixedDelay = 500L)
|
|
|
fun tick() {
|
|
|
doRand0()
|
|
|
log.info("tick")
|
|
@@ -84,10 +85,11 @@ class GlobalBank : Bank {
|
|
|
@Autowired
|
|
|
private lateinit var emitter: GlobalEmitter
|
|
|
|
|
|
- val accountMap = mutableMapOf<UUID, DemoAccount>()
|
|
|
+ val accountMap = mutableMapOf<UUID, BankAccount>()
|
|
|
val personToAccountTable = mutableSetOf<Pair<UUID, UUID>>()
|
|
|
val globalQueue: Deque<BankEvent> = ArrayDeque()
|
|
|
val globalFutureMap = mutableMapOf<UUID, CompletableDeferred<Any>>()
|
|
|
+ val coinToAccountMap = mutableMapOf<UUID, UUID>()
|
|
|
|
|
|
private lateinit var selfId: UUID
|
|
|
private lateinit var selfAccount: BankAccount
|
|
@@ -114,9 +116,9 @@ class GlobalBank : Bank {
|
|
|
final override fun addAccount(person: UUID): BankAccount {
|
|
|
val account = if (person != selfId) {
|
|
|
val accountId = if (!accountMap.containsKey(person)) person else UUID.randomUUID() //first account assign to personId
|
|
|
- DemoAccount(accountId, person)
|
|
|
+ DemoOwnerAccout(DemoAccount(accountId, person), this)
|
|
|
} else {
|
|
|
- DemoSelfAccount(selfId)
|
|
|
+ DemoOwnerAccout(DemoSelfAccount(selfId), this)
|
|
|
}
|
|
|
accountMap[account.id] = account
|
|
|
personToAccountTable.add(person to account.id)
|
|
@@ -145,20 +147,20 @@ class GlobalBank : Bank {
|
|
|
CoinUtils.makeSomeStrategy(CoinStrategy(e.amount, if (e.from.id != emitter.id) e.from.coins else null, if (e.from.id != selfAccount.id) e.to.coins else null)).let { st ->
|
|
|
if (st.res!!.isOk()) {
|
|
|
st
|
|
|
- } else if (st.res == CoinStrategyType.error) {
|
|
|
- globalQueue.offerLast(ExchangeFailedEvent(UUID.randomUUID(), e, "strategy error"))
|
|
|
+ } else if (st.res!!.isErr()) {
|
|
|
+ globalQueue.offerLast(ExchangeFailedEvent(UUID.randomUUID(), e, "strategy error ${st.res}"))
|
|
|
null
|
|
|
} else {
|
|
|
TODO()
|
|
|
}
|
|
|
}?.let { st ->
|
|
|
- log.info("${selfAccount} do ${st.res?.name} transaction")
|
|
|
+ log.info("${selfAccount} do ${st.res?.name} transaction: ${e.from} -> ${e.to} = ${e.amount}")
|
|
|
var fromAmount = e.from.overall
|
|
|
var toAmount = e.to.overall
|
|
|
|
|
|
val leftExtracted = if (st.leftExtract != null) {
|
|
|
val old = e.from.extract(st.leftExtract?.map { it.id }!!)
|
|
|
- emitter.accept(old)
|
|
|
+ //emitter.accept(old) later
|
|
|
old
|
|
|
} else {
|
|
|
listOf()
|
|
@@ -177,7 +179,7 @@ class GlobalBank : Bank {
|
|
|
|
|
|
val rightExtracted = if (st.rightExtract != null) {
|
|
|
val old = e.to.extract(st.rightExtract?.map { it.id }!!)
|
|
|
- emitter.accept(old)
|
|
|
+ //emitter.accept(old) later
|
|
|
old
|
|
|
} else {
|
|
|
listOf()
|
|
@@ -198,8 +200,13 @@ class GlobalBank : Bank {
|
|
|
}
|
|
|
|
|
|
if (st.rightCashback != null) {
|
|
|
- val someCoins = emitter.extract(st.rightCashback?.map { it.id }!!)
|
|
|
+ val localCoins = leftExtracted.map { it.id to it }.toMap().toMutableMap()
|
|
|
+ val externalCoins = emitter.extract(st.rightCashback?.filterNot { localCoins.containsKey(it.id) }?.map { it.id }!!)
|
|
|
+ val sameCoins = st.rightCashback?.filter { localCoins.containsKey(it.id) } ?: listOf()
|
|
|
+ val someCoins = sameCoins + externalCoins
|
|
|
e.to.accept(someCoins)
|
|
|
+ someCoins.forEach { localCoins.remove(it.id) }
|
|
|
+ emitter.accept(localCoins.values)
|
|
|
if (e.to.id != selfAccount.id) {
|
|
|
emitter.free(someCoins)
|
|
|
} else {
|
|
@@ -208,8 +215,13 @@ class GlobalBank : Bank {
|
|
|
}
|
|
|
|
|
|
if (st.leftCashback != null) {
|
|
|
- val someCoins = emitter.extract(st.leftCashback?.map { it.id }!!)
|
|
|
+ val localCoins = rightExtracted.map { it.id to it }.toMap().toMutableMap()
|
|
|
+ val externalCoins = emitter.extract(st.leftCashback?.filterNot { localCoins.containsKey(it.id) }?.map { it.id }!!)
|
|
|
+ val sameCoins = st.leftCashback?.filter { localCoins.containsKey(it.id) } ?: listOf()
|
|
|
+ val someCoins = sameCoins + externalCoins
|
|
|
e.from.accept(someCoins)
|
|
|
+ someCoins.forEach { localCoins.remove(it.id) }
|
|
|
+ emitter.accept(localCoins.values)
|
|
|
if (e.from.id != selfAccount.id) {
|
|
|
emitter.free(someCoins)
|
|
|
} else {
|
|
@@ -219,16 +231,34 @@ class GlobalBank : Bank {
|
|
|
|
|
|
val creditOk = (fromAmount - e.amount == e.from.overall)
|
|
|
val debitOk = (toAmount + e.amount == e.to.overall)
|
|
|
- if (debitOk && creditOk) {
|
|
|
+ val fromCoins = e.from.coins
|
|
|
+ val toCoins = e.to.coins
|
|
|
+ val uniqOk = if (fromCoins != null) {
|
|
|
+ if (toCoins != null) {
|
|
|
+ fromCoins.map { it.id }.toSet().intersect(toCoins.map { it.id }.toSet()).isEmpty()
|
|
|
+ } else {
|
|
|
+ true
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ true
|
|
|
+ }
|
|
|
+ if (debitOk && creditOk && uniqOk) {
|
|
|
true
|
|
|
} else {
|
|
|
TODO()
|
|
|
false
|
|
|
}
|
|
|
+ }?.let { ok ->
|
|
|
+ if (ok) {
|
|
|
+ globalQueue.offerLast(ExchangeSuccessEvent(UUID.randomUUID(), e))
|
|
|
+ } else {
|
|
|
+ globalQueue.offerLast(ExchangeFailedEvent(UUID.randomUUID(), e, "exchange strategy failed"))
|
|
|
+ }
|
|
|
+ ok
|
|
|
} ?: true
|
|
|
}
|
|
|
is ExchangeSuccessEvent -> {
|
|
|
- globalFutureMap.get(e.parentEvent.id)?.let {
|
|
|
+ globalFutureMap.get(e.parentEvent.id)!!.let {
|
|
|
globalFutureMap.remove(e.parentEvent.id)
|
|
|
it.complete(true)
|
|
|
}
|
|
@@ -241,6 +271,7 @@ class GlobalBank : Bank {
|
|
|
}
|
|
|
true
|
|
|
}
|
|
|
+ is BankPauseOnRecalcEvent -> !e.paused
|
|
|
else -> throw IllegalArgumentException("wrong event type ${e.javaClass.name}")
|
|
|
}
|
|
|
}
|
|
@@ -263,18 +294,21 @@ class GlobalBank : Bank {
|
|
|
emitter.listen { e ->
|
|
|
log.info("emitter event ${e.javaClass.name}")
|
|
|
when (e) {
|
|
|
+ is EmitterStartRecalculationEvent -> {
|
|
|
+ globalQueue.offerFirst(BankPauseOnRecalcEvent(UUID.randomUUID(), e))
|
|
|
+ }
|
|
|
is EmitterStopRecalculationEvent -> {
|
|
|
- accountMap.values.map { it to it.internalCoins.keys.intersect(e.nullCoins) }.filter { it.second.isNotEmpty() }.forEach { ap ->
|
|
|
- ap.first._overall = null
|
|
|
- val nullCoins = ap.second.map { ap.first.extractOne(it) }
|
|
|
+ accountMap.values.map { it to it.coins?.map { it.id }?.intersect(e.nullCoins) }.filter { it.second?.isNotEmpty() ?: false }.forEach { ap ->
|
|
|
+ val nullCoins = ap.second?.map { ap.first.extractOne(it) }?.toSet() ?: setOf()
|
|
|
emitter.accept(nullCoins)
|
|
|
}
|
|
|
+ globalQueue.filter { it is BankPauseOnRecalcEvent }.map { it as BankPauseOnRecalcEvent }.first().paused = false
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Scheduled(initialDelay = 1000L, fixedDelay = 1000L)
|
|
|
+ @Scheduled(initialDelay = 1000L, fixedDelay = 100L)
|
|
|
fun tick() {
|
|
|
var pollCount = 0
|
|
|
while (globalQueue.isNotEmpty() && pollCount < maxPollCount) {
|
|
@@ -286,6 +320,42 @@ class GlobalBank : Bank {
|
|
|
log.info("poll")
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ class DemoOwnerAccout(val inner: BankAccount, val owner: GlobalBank) : BankAccount {
|
|
|
+
|
|
|
+ override fun getOverall(): Long {
|
|
|
+ return inner.overall
|
|
|
+ }
|
|
|
+
|
|
|
+ override fun getCoins(): MutableCollection<Coin>? {
|
|
|
+ return inner.coins
|
|
|
+ }
|
|
|
+
|
|
|
+ override fun acceptOne(coin: Coin) {
|
|
|
+ if (owner.coinToAccountMap.containsKey(coin.id)) {
|
|
|
+ throw RuntimeException("already owned")
|
|
|
+ } else {
|
|
|
+ owner.coinToAccountMap.put(coin.id, this.id)
|
|
|
+ }
|
|
|
+ inner.acceptOne(coin)
|
|
|
+ }
|
|
|
+
|
|
|
+ override fun getId(): UUID {
|
|
|
+ return inner.id
|
|
|
+ }
|
|
|
+
|
|
|
+ override fun extractOne(coinId: UUID): Coin? {
|
|
|
+ if (owner.coinToAccountMap.remove(coinId) == null) {
|
|
|
+ throw RuntimeException("owner missing")
|
|
|
+ }
|
|
|
+ return inner.extractOne(coinId)
|
|
|
+ }
|
|
|
+
|
|
|
+ override fun toString(): String {
|
|
|
+ return "*${inner}"
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
abstract class AbstractBankEvent(val eventId: UUID) : BankEvent {
|
|
@@ -321,6 +391,8 @@ class ExchangeFailedException(val event: ExchangeBankEvent, message: String = "e
|
|
|
|
|
|
class ExchangeStartEvent(eventId: UUID, val to: BankAccount, val from: BankAccount, val amount: Long, var retry: Boolean = false) : ExchangeBankEvent(eventId)
|
|
|
|
|
|
+class BankPauseOnRecalcEvent(eventId: UUID, val emitterEvent: EmitterStartRecalculationEvent, var paused: Boolean = true) : AbstractBankEvent(eventId)
|
|
|
+
|
|
|
@Service
|
|
|
class DemoInMemEmitter : GlobalEmitter {
|
|
|
|
|
@@ -360,12 +432,11 @@ class DemoInMemEmitter : GlobalEmitter {
|
|
|
log.info("emitter emit ${count} of ${value}")
|
|
|
}
|
|
|
|
|
|
- override fun accept(nullCoins: MutableCollection<Coin>) {
|
|
|
- val nullIds = nullCoins.map { it.id }
|
|
|
- coinFreeSet.removeAll(nullIds)
|
|
|
- coinExtractedSet.removeAll(nullIds)
|
|
|
- nullCoins.forEach { coinValueIndex.computeIfAbsent(it.value to it.era) { c -> mutableSetOf() }.add(it.id) }
|
|
|
- log.info("emitter redeem ${nullCoins.size} coins, ${CoinUtils.sum(nullCoins)}")
|
|
|
+ override fun acceptOne(nullCoin: Coin) {
|
|
|
+ coinFreeSet.remove(nullCoin.id)
|
|
|
+ coinExtractedSet.remove(nullCoin.id)
|
|
|
+ coinValueIndex.computeIfAbsent(nullCoin.value to nullCoin.era) { c -> mutableSetOf() }.add(nullCoin.id)
|
|
|
+ log.info("emitter redeem coin ${CoinUtils.sum(listOf(nullCoin))}")
|
|
|
}
|
|
|
|
|
|
override fun getId(): UUID {
|
|
@@ -374,7 +445,9 @@ class DemoInMemEmitter : GlobalEmitter {
|
|
|
|
|
|
override fun extractOne(coinId: UUID): Coin {
|
|
|
val coin = coinMap.get(coinId)!!
|
|
|
- coinExtractedSet.add(coin.id)
|
|
|
+ if (!coinExtractedSet.add(coin.id)) {
|
|
|
+ throw RuntimeException("coin already extracted")
|
|
|
+ }
|
|
|
coinValueIndex.computeIfAbsent(coin.value to coin.era) { c -> mutableSetOf() }.remove(coin.id)
|
|
|
log.info("emitter extract ${CoinUtils.sum(listOf(coin))}, ${CoinUtils.sumString(listOf(coin))}")
|
|
|
return coin
|
|
@@ -579,31 +652,36 @@ open class DemoAccount(val accountId: UUID, personId: UUID) : BankAccount {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- override fun accept(coins: MutableCollection<Coin>) {
|
|
|
- coins.forEach {
|
|
|
- if (it.id != null && !internalCoins.containsKey(it.id)) {
|
|
|
- internalCoins.put(it.id, it)
|
|
|
- incOverall(it.current)
|
|
|
- } else if (it.id == null) {
|
|
|
- throw IllegalArgumentException("fake coin is not acceptable")
|
|
|
- } else if (internalCoins.containsKey(it.id)) {
|
|
|
- throw IllegalArgumentException("same coin is not acceptable")
|
|
|
- }
|
|
|
+ override fun acceptOne(coin: Coin) {
|
|
|
+ if (coin.id != null && !internalCoins.containsKey(coin.id)) {
|
|
|
+ internalCoins.put(coin.id, coin)
|
|
|
+ incOverall(coin.current)
|
|
|
+ } else if (coin.id == null) {
|
|
|
+ throw IllegalArgumentException("fake coin is not acceptable")
|
|
|
+ } else if (internalCoins.containsKey(coin.id)) {
|
|
|
+ throw IllegalArgumentException("same coin is not acceptable")
|
|
|
}
|
|
|
- log.info("${this} debit coins ${CoinUtils.sumString(coins)}")
|
|
|
+ log.info("${this} debit coin ${CoinUtils.sum(listOf(coin))}")
|
|
|
}
|
|
|
|
|
|
|
|
|
override fun toString(): String {
|
|
|
- return accountId.hashCode().toString(16)
|
|
|
+ return accountId.hashCode().absoluteValue.toString(16)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
enum class CoinStrategyType {
|
|
|
- error, L2R_RL, L2R_R, B2R_E, L2R, E2B, L2R_E;
|
|
|
+ L2R_RL, L2R_R, B2R_E, L2R, E2B, L2R_E, ERR_LC, ERR_NUL;
|
|
|
+
|
|
|
+ companion object {
|
|
|
+ val OK = setOf(L2R_RL, L2R_R, B2R_E, L2R, E2B, L2R_E)
|
|
|
+ val ERR = setOf(ERR_LC, ERR_NUL)
|
|
|
+ }
|
|
|
|
|
|
- fun isOk(): Boolean = this != error
|
|
|
+ fun isOk(): Boolean = OK.contains(this)
|
|
|
+
|
|
|
+ fun isErr(): Boolean = ERR.contains(this)
|
|
|
}
|
|
|
|
|
|
class CoinStrategy(val credit: Long, val leftCoins: Collection<Coin>? = null, val rightCoins: Collection<Coin>? = null) {
|
|
@@ -709,6 +787,9 @@ class CoinUtils {
|
|
|
if (creditCoinsCashback.isNotEmpty()) {
|
|
|
val creditCashback = sum(creditCoinsReal) - st.credit
|
|
|
if (st.rightCoins != null && creditCashback <= sum(st.rightCoins)) {
|
|
|
+ if (st.leftCoins.map { it.id }.toSet().intersect(st.rightCoins.map { it.id }.toSet()).isNotEmpty()) {
|
|
|
+ throw RuntimeException("same coins, alarma")
|
|
|
+ }
|
|
|
val debitCoins = splitCoins(st.rightCoins, creditCashback)
|
|
|
val debitCoinsReal = debitCoins.filter { it.id != null }
|
|
|
val debitCoinsCashback = debitCoins.filter { it.id == null }
|
|
@@ -765,7 +846,7 @@ class CoinUtils {
|
|
|
st
|
|
|
}
|
|
|
} else {
|
|
|
- st.res = CoinStrategyType.error
|
|
|
+ st.res = CoinStrategyType.ERR_LC
|
|
|
st
|
|
|
}
|
|
|
} else {
|
|
@@ -774,7 +855,7 @@ class CoinUtils {
|
|
|
st
|
|
|
}
|
|
|
} else {
|
|
|
- st.res = CoinStrategyType.error
|
|
|
+ st.res = CoinStrategyType.ERR_NUL
|
|
|
st
|
|
|
}
|
|
|
}
|