# HG changeset patch # User sam # Date 1712424945 -25200 # Node ID b6427706c7b1fca7a4bc7842fb348c1e09fb399b # Parent ea708c16e0122093c38d7acd0550bd12fb0a5673 add: final (for now) storage API diff -r ea708c16e012 -r b6427706c7b1 semicongine/storage.nim --- a/semicongine/storage.nim Sat Apr 06 22:48:30 2024 +0700 +++ b/semicongine/storage.nim Sun Apr 07 00:35:45 2024 +0700 @@ -29,6 +29,7 @@ of SystemStorage: Path(getAppDir()) / STORAGE_NAME of UserStorage: + string(Path(getDataDir()) / Path(AppName())).createDir() Path(getDataDir()) / Path(AppName()) / STORAGE_NAME proc openDb(storageType: StorageType): DbConn = @@ -39,56 +40,107 @@ value TEXT NOT NULL )""")) -proc store[T](db: DbConn, key: string, value: T) = +proc storeInDb[T](db: DbConn, key: string, value: T) = const KEY_VALUE_TABLE_NAME = "shelf" db.exec(sql(&"""INSERT INTO {KEY_VALUE_TABLE_NAME} VALUES(?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value """), key, $$value) -proc load[T](db: DbConn, key: string, default = default(T)): T = +proc loadFromDb[T](db: DbConn, key: string, default = default(T)): T = const KEY_VALUE_TABLE_NAME = "shelf" let dbResult = db.getValue(sql(&"""SELECT value FROM {KEY_VALUE_TABLE_NAME} WHERE key = ? """), key) if dbResult == "": return default return to[T](dbResult) -proc storageWorker[T](params: tuple[storageType: StorageType, keyChannel: ptr Channel[(StorageOperation, string)], dataChannel: ptr Channel[T]]) = - var db = params.storageType.openDb() +# mini async API +# +# LOADING ######################################3333 +# +# +proc purge*(storageType: StorageType) = + storageType.path().string.removeFile() + +type + LoadFuture*[T, U] = object + thread: Thread[U] + channel: ptr Channel[T] + result: T + +proc cleanup*[T, U](p: var LoadFuture[T, U]) = + if p.channel != nil: + p.thread.joinThread() + p.channel[].close() + deallocShared(p.channel) + p.channel = nil + +proc awaitResult*[T, U](p: var LoadFuture[T, U]): T = + if p.channel == nil: + return p.result + result = p.channel[].recv() + p.result = result + p.cleanup() + +proc hasResult*[T, U](p: var LoadFuture[T, U]): bool = + let ret = p.channel[].tryRecv() + result = ret.dataAvailable + if result: + p.result = ret.msg + p.cleanup() + +proc getResult*[T, U](p: LoadFuture[T, U]): T = + assert p.channel == nil, "Result is not available yet" + return p.result + +proc loadWorker[T](params: (StorageType, string, ptr Channel[T])) = + var db = params[0].openDb() defer: db.close() - var key: (StorageOperation, string) - while true: - key = params.keyChannel[].recv() - case key[0]: - of Read: params.dataChannel[].send(load[T](db, key[1])) - of Write: store(db, key[1], params.dataChannel[].recv()) - of KillWorker: break - -proc openStorage*[T](storageType: StorageType): Storage[T] = - result.keyChannel = cast[ptr Channel[(StorageOperation, string)]](allocShared0(sizeof(Channel[(StorageOperation, string)]))) - result.keyChannel[].open() - result.dataChannel = cast[ptr Channel[T]](allocShared0(sizeof(Channel[T]))) - result.dataChannel[].open() - createThread(result.thread, storageWorker[T], (storageType, result.keyChannel, result.dataChannel)) + let ret = loadFromDb[T](db, params[1]) + params[2][].send(ret) -proc get*[T](storage: Storage[T], key: string): Channel[T] = - storage.keyChannel.send((Read, key)) - return storage.dataChannel[] +proc load*[T](storageType: StorageType, key: string): LoadFuture[T, (StorageType, string, ptr Channel[T])] = + result.channel = cast[ptr Channel[T]](allocShared0(sizeof(Channel[T]))) + result.channel[].open() + createThread(result.thread, loadWorker[T], (storageType, key, result.channel)) -proc set*[T](storage: Storage[T], key: string, value: T) = - storage.keyChannel.send((Write, key)) - storage.dataChannel.send(value) +# STORING ######################################3333 +# +type + StoreFuture*[T, U] = object + thread: Thread[U] + channel: ptr Channel[T] + doneChannel: ptr Channel[bool] + +proc cleanup*[T, U](p: var StoreFuture[T, U]) = + if p.channel != nil: + p.thread.joinThread() + p.channel[].close() + p.doneChannel[].close() + deallocShared(p.channel) + deallocShared(p.doneChannel) + p.channel = nil + p.doneChannel = nil -proc purge*[T](storage: var Storage[T]) = - storage.closeStorage() - storage.path().string.removeFile() +proc awaitStored*[T, U](p: var StoreFuture[T, U]) = + discard p.doneChannel[].recv() + p.cleanup() +proc isStored*[T, U](p: var StoreFuture[T, U]): bool = + let ret = p.doneChannel[].tryRecv() + result = ret.dataAvailable + if ret.dataAvailable: + p.cleanup() -proc closeStorage*[T](storage: var Storage[T]) = - storage.keyChannel[].send((KillWorker, "")) - storage.thread.joinThread() +proc storeWorker[T](params: (StorageType, string, ptr Channel[T], ptr Channel[bool])) = + var db = params[0].openDb() + defer: db.close() + storeInDb(db, params[1], params[2][].recv()) + params[3][].send(true) - storage.keyChannel[].close() - deallocShared(storage.keyChannel) - - storage.dataChannel[].close() - deallocShared(storage.dataChannel) +proc store*[T](storageType: StorageType, key: string, value: T): StoreFuture[T, (StorageType, string, ptr Channel[T], ptr Channel[bool])] = + result.channel = cast[ptr Channel[T]](allocShared0(sizeof(Channel[T]))) + result.channel[].open() + result.doneChannel = cast[ptr Channel[bool]](allocShared0(sizeof(Channel[bool]))) + result.doneChannel[].open() + createThread(result.thread, storeWorker[T], (storageType, key, result.channel, result.doneChannel)) + result.channel[].send(value) diff -r ea708c16e012 -r b6427706c7b1 tests/test_storage.nim --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tests/test_storage.nim Sun Apr 07 00:35:45 2024 +0700 @@ -0,0 +1,52 @@ +import std/os + +import semicongine + +proc testSimple(storage: StorageType) = + const TEST_VALUE = 42 + const KEY = "test" + + # get default + var promise1 = load[int](storage, KEY) + assert promise1.awaitResult() == default(type(TEST_VALUE)) + + # save and load custom + var promise2 = store(storage, KEY, TEST_VALUE) + promise2.awaitStored() + promise1 = load[int](storage, KEY) + assert promise1.awaitResult() == TEST_VALUE + +proc testBusyWait(storage: StorageType) = + const TEST_VALUE = "43" + const KEY = "test2" + + # get default + var promise1 = load[string](storage, KEY) + while not promise1.hasResult(): + sleep(1) + assert promise1.getResult() == default(type(TEST_VALUE)) + + # save and load custom + var promise2 = store(storage, KEY, TEST_VALUE) + while not promise2.isStored(): + sleep(1) + promise1 = load[string](storage, KEY) + while not promise1.hasResult(): + sleep(1) + assert promise1.awaitResult() == TEST_VALUE + +proc main() = + echo "SystemStorage: Testing simple store/load" + SystemStorage.testSimple() + echo "SystemStorage: Testing store/load with busy wait" + SystemStorage.testBusyWait() + + UserStorage.purge() + echo "UserStorage: Testing simple store/load" + UserStorage.testSimple() + echo "UserStorage: Testing store/load with busy wait" + UserStorage.testBusyWait() + UserStorage.purge() + +when isMainModule: + main()