feat(users/Profpatsch/whatcd-resolver): conduit shenanigans

Some experiments in speeding up search efficiency.

This is more of a “is this possible” thing, and it looks like it’s
possible, but does not really improve anything regarding the rate
limiting.

The idea is that we can start everything at the same time as async,
but use a semaphore to have only 5 things run at once. That also means
that as soon as something is done, we immediately start the next task.
The asyncs are guaranteed to be cleaned up by the `ResourceT`
wrapper (eventually).

It’s pretty cool how Conduit makes writing these side-effecting things
pretty possible.

Change-Id: Ibadead7db293373b415840960602fa71920fc653
Reviewed-on: https://cl.tvl.fyi/c/depot/+/13246
Tested-by: BuildkiteCI
Reviewed-by: Profpatsch <mail@profpatsch.de>
This commit is contained in:
Profpatsch 2025-03-11 18:42:31 +01:00
parent ae0e75aaf2
commit 42da189180
2 changed files with 156 additions and 21 deletions

View file

@ -11,6 +11,7 @@ import Conduit (ConduitT)
import Conduit qualified as Cond
import Control.Monad.Logger.CallStack
import Control.Monad.Reader
import Control.Monad.Trans.Resource (resourceForkWith)
import Data.Aeson qualified as Json
import Data.Aeson.BetterErrors qualified as Json
import Data.Aeson.Key qualified as Key
@ -35,6 +36,7 @@ import MyLabel
import MyPrelude
import Network.HTTP.Types
import Network.Wai.Parse qualified as Wai
import OpenTelemetry.Context.ThreadLocal qualified as Otel
import OpenTelemetry.Trace qualified as Otel hiding (getTracer, inSpan, inSpan')
import Optional
import Parse (Parse, mapLookup, mapLookupMay, runParse)
@ -42,7 +44,12 @@ import Parse qualified
import Postgres.Decoder qualified as Dec
import Postgres.MonadPostgres
import Pretty
import UnliftIO (MonadUnliftIO)
import RevList (RevList)
import RevList qualified
import UnliftIO (MonadUnliftIO, askRunInIO, async, newQSem, withQSem)
import UnliftIO.Async (Async)
import UnliftIO.Async qualified as Async
import UnliftIO.Concurrent (threadDelay)
import Prelude hiding (length, span)
class MonadRedacted m where
@ -330,11 +337,62 @@ parseTourGroups opts =
)
)
testChunkBetween :: IO [NonEmpty Integer]
testChunkBetween = do
Cond.runConduit $
( do
Cond.yield [0]
Cond.yield [1, 2]
Cond.yield [3, 4]
Cond.yield []
Cond.yield [5, 6]
Cond.yield [7]
)
.| filterEmpty
.| chunkBetween (t2 #low 2 #high 4)
.| Cond.sinkList
filterEmpty :: (Monad m) => ConduitT [a] (NonEmpty a) m ()
filterEmpty = Cond.awaitForever yieldIfNonEmpty
-- | Chunk the given stream of lists into chunks that are between @low@ and @high@ (both incl).
-- The last chunk might be shorter than low.
chunkBetween ::
( HasField "low" p Natural,
HasField "high" p Natural,
Monad m
) =>
p ->
ConduitT (NonEmpty a) (NonEmpty a) m ()
chunkBetween opts = do
go mempty
where
low = min opts.low opts.high
high = max opts.low opts.high
high' = assertField (boundedNatural @Int) high
l = lengthNatural
go !acc = do
if l acc >= low
then do
let (c, rest) = List.splitAt high' acc
yieldIfNonEmpty c
go rest
else do
xs <- Cond.await
case xs of
Nothing -> yieldIfNonEmpty acc
Just xs' -> go (acc <> NonEmpty.toList xs')
yieldIfNonEmpty :: (Monad m) => [a] -> ConduitT i (NonEmpty a) m ()
yieldIfNonEmpty = \case
IsNonEmpty xs -> Cond.yield xs
IsEmpty -> pure ()
redactedPagedSearchAndInsert ::
forall m.
( MonadLogger m,
MonadPostgres m,
MonadUnliftIO m
MonadOtel m
) =>
Json.Parse ErrorTree TourGroups ->
-- | A redacted request that returns a paged result
@ -350,13 +408,15 @@ redactedPagedSearchAndInsert innerParser pagedRequest = do
let remainingPages = firstPage.pages - 1
logInfo [fmt|Got the first page, found {remainingPages} more pages|]
let otherPagesNum = [(2 :: Natural) .. remainingPages]
let withFirst = do
firstBlock <- Cond.await
Cond.yield (firstBlock & maybe [firstPage] (firstPage :))
Cond.awaitForever pure
Cond.runConduit @(Transaction m) $
runConcurrentlyBunched (lbl #batchSize 5) (go . Just <$> otherPagesNum)
.| withFirst
( do
Cond.yield (singleton firstPage)
case otherPagesNum of
IsNonEmpty o -> runConcurrentlyBunched' (lbl #batchSize 5) (go . Just <$> o)
IsEmpty -> pure ()
)
.| chunkBetween (t2 #low 5 #high 10)
.| Cond.mapMC
( \block ->
block
@ -386,7 +446,8 @@ redactedPagedSearchAndInsert innerParser pagedRequest = do
[T2 "torrentId" Int "fullJsonResult" Json.Value]
) ->
Transaction m ()
insertTourGroupsAndTorrents dat = do
insertTourGroupsAndTorrents dat = inSpan' "Insert Tour Groups & Torrents" $ \span -> do
addAttribute span "tour_group.length" (dat & lengthNatural, naturalDecimalT)
let tourGroups = dat <&> (.tourGroup)
let torrents = dat <&> (.torrents)
insertTourGroups tourGroups
@ -493,25 +554,97 @@ redactedPagedSearchAndInsert innerParser pagedRequest = do
pure ()
-- | Traverse over the given function in parallel, but only allow a certain amount of concurrent requests.
runConcurrentlyBunched ::
-- Will start new threads as soon as a resource becomes available, but always return results in input ordering.
runConcurrentlyBunched' ::
forall m opts a.
( MonadUnliftIO m,
HasField "batchSize" opts Natural
) =>
opts ->
-- | list of actions to run
[m a] ->
ConduitT () [a] m ()
runConcurrentlyBunched opts acts = do
NonEmpty (m a) ->
ConduitT () (NonEmpty a) m ()
runConcurrentlyBunched' opts acts = do
let batchSize = assertField (boundedNatural @Int) opts.batchSize
let go :: [m a] -> ConduitT () [a] m ()
go [] = pure ()
go acts' = do
let (batch, rest) = splitAt batchSize acts'
res <- lift $ mapConcurrentlyTraced id batch
Cond.yield res
go rest
go acts
runInIO <- lift askRunInIO
-- NB: make sure none of the asyncs escape from here
Cond.transPipe (Cond.runResourceT @m) $ do
-- This use of resourceForkWith looks a little off, but its the only way to return an `Async a`, I hope it brackets correctly lol
ctx <- Otel.getContext
let spawn :: m a -> Cond.ResourceT m (Async a)
spawn f = resourceForkWith (\io -> async (io >> Otel.attachContext ctx >> runInIO f)) (pure ())
qsem <- newQSem batchSize
-- spawn all asyncs here, but limit how many get run consecutively by threading through a semaphore
spawned <- for acts $ \act ->
lift $ spawn $ withQSem qsem $ act
Cond.yieldMany (spawned & NonEmpty.toList)
.| awaitAllReadyAsyncs
-- | Consume as many asyncs as are ready and return their results.
--
-- Make sure they are already running (if you use 'Cond.yieldM' they are only started when awaited by the conduit).
--
-- If any async throws an exception, the exception will be thrown in the conduit.
-- Already running asyncs will not be cancelled. (TODO: can we somehow make that a thing?)
awaitAllReadyAsyncs :: forall m a. (MonadIO m) => ConduitT (Async a) (NonEmpty a) m ()
awaitAllReadyAsyncs = go
where
-- wait for the next async and then consume as many as are already done
go :: ConduitT (Async a) (NonEmpty a) m ()
go = do
Cond.await >>= \case
Nothing -> pure ()
Just nextAsync -> do
res <- Async.wait nextAsync
-- consume as many asyncs as are already done
goAllReady (RevList.singleton res)
goAllReady :: RevList a -> ConduitT (Async a) (NonEmpty a) m ()
goAllReady !acc = do
next <- Cond.await
case next of
Nothing -> yieldIfNonEmptyRev acc
Just a -> do
thereAlready <- Async.poll a
case thereAlready of
Nothing -> do
-- consumed everything that was available, yield and wait for the next block
yieldIfNonEmptyRev acc
Cond.leftover a
go
Just _ -> do
-- will not block
res <- Async.wait a
goAllReady (acc <> RevList.singleton res)
yieldIfNonEmptyRev :: RevList a -> ConduitT (Async a) (NonEmpty a) m ()
yieldIfNonEmptyRev r = do
case r & RevList.revListToList of
IsNonEmpty e -> Cond.yield e
IsEmpty -> pure ()
testAwaitAllReadyAsyncs :: IO [[Char]]
testAwaitAllReadyAsyncs =
Cond.runConduit $
( do
running <-
lift $
sequence
[ async (print "foo" >> pure 'a'),
async (pure 'b'),
async (threadDelay 5000 >> pure 'c'),
async (print "bar" >> pure '5'),
async (threadDelay 1_000_000 >> print "lol" >> pure 'd'),
async (error "no"),
async (print "baz" >> pure 'f')
]
Cond.yieldMany running
)
.| awaitAllReadyAsyncs
.| Cond.mapC toList
.| Cond.sinkList
-- | Run the field parser and throw an uncatchable assertion error if it fails.
assertField :: (HasCallStack) => FieldParser from to -> from -> to

View file

@ -89,6 +89,7 @@ library
pa-run-command,
aeson-better-errors,
aeson,
async,
bencode,
blaze-html,
bytestring,
@ -109,6 +110,7 @@ library
mtl,
network-uri,
random,
resourcet,
resource-pool,
template-haskell,
postgresql-simple,