From 42da189180e5263adf3a8504e550e453f77a27f7 Mon Sep 17 00:00:00 2001 From: Profpatsch Date: Tue, 11 Mar 2025 18:42:31 +0100 Subject: [PATCH] feat(users/Profpatsch/whatcd-resolver): conduit shenanigans MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Some experiments in speeding up search efficiency. This is more of a “is this possible” thing, and it looks like it’s possible, but does not really improve anything regarding the rate limiting. The idea is that we can start everything at the same time as async, but use a semaphore to have only 5 things run at once. That also means that as soon as something is done, we immediately start the next task. The asyncs are guaranteed to be cleaned up by the `ResourceT` wrapper (eventually). It’s pretty cool how Conduit makes writing these side-effecting things pretty possible. Change-Id: Ibadead7db293373b415840960602fa71920fc653 Reviewed-on: https://cl.tvl.fyi/c/depot/+/13246 Tested-by: BuildkiteCI Reviewed-by: Profpatsch --- .../whatcd-resolver/src/Redacted.hs | 175 +++++++++++++++--- .../whatcd-resolver/whatcd-resolver.cabal | 2 + 2 files changed, 156 insertions(+), 21 deletions(-) diff --git a/users/Profpatsch/whatcd-resolver/src/Redacted.hs b/users/Profpatsch/whatcd-resolver/src/Redacted.hs index 50ad5de33..eeecd84cf 100644 --- a/users/Profpatsch/whatcd-resolver/src/Redacted.hs +++ b/users/Profpatsch/whatcd-resolver/src/Redacted.hs @@ -11,6 +11,7 @@ import Conduit (ConduitT) import Conduit qualified as Cond import Control.Monad.Logger.CallStack import Control.Monad.Reader +import Control.Monad.Trans.Resource (resourceForkWith) import Data.Aeson qualified as Json import Data.Aeson.BetterErrors qualified as Json import Data.Aeson.Key qualified as Key @@ -35,6 +36,7 @@ import MyLabel import MyPrelude import Network.HTTP.Types import Network.Wai.Parse qualified as Wai +import OpenTelemetry.Context.ThreadLocal qualified as Otel import OpenTelemetry.Trace qualified as Otel hiding (getTracer, inSpan, inSpan') import Optional import Parse (Parse, mapLookup, mapLookupMay, runParse) @@ -42,7 +44,12 @@ import Parse qualified import Postgres.Decoder qualified as Dec import Postgres.MonadPostgres import Pretty -import UnliftIO (MonadUnliftIO) +import RevList (RevList) +import RevList qualified +import UnliftIO (MonadUnliftIO, askRunInIO, async, newQSem, withQSem) +import UnliftIO.Async (Async) +import UnliftIO.Async qualified as Async +import UnliftIO.Concurrent (threadDelay) import Prelude hiding (length, span) class MonadRedacted m where @@ -330,11 +337,62 @@ parseTourGroups opts = ) ) +testChunkBetween :: IO [NonEmpty Integer] +testChunkBetween = do + Cond.runConduit $ + ( do + Cond.yield [0] + Cond.yield [1, 2] + Cond.yield [3, 4] + Cond.yield [] + Cond.yield [5, 6] + Cond.yield [7] + ) + .| filterEmpty + .| chunkBetween (t2 #low 2 #high 4) + .| Cond.sinkList + +filterEmpty :: (Monad m) => ConduitT [a] (NonEmpty a) m () +filterEmpty = Cond.awaitForever yieldIfNonEmpty + +-- | Chunk the given stream of lists into chunks that are between @low@ and @high@ (both incl). +-- The last chunk might be shorter than low. +chunkBetween :: + ( HasField "low" p Natural, + HasField "high" p Natural, + Monad m + ) => + p -> + ConduitT (NonEmpty a) (NonEmpty a) m () +chunkBetween opts = do + go mempty + where + low = min opts.low opts.high + high = max opts.low opts.high + high' = assertField (boundedNatural @Int) high + l = lengthNatural + go !acc = do + if l acc >= low + then do + let (c, rest) = List.splitAt high' acc + yieldIfNonEmpty c + go rest + else do + xs <- Cond.await + case xs of + Nothing -> yieldIfNonEmpty acc + Just xs' -> go (acc <> NonEmpty.toList xs') + +yieldIfNonEmpty :: (Monad m) => [a] -> ConduitT i (NonEmpty a) m () +yieldIfNonEmpty = \case + IsNonEmpty xs -> Cond.yield xs + IsEmpty -> pure () + redactedPagedSearchAndInsert :: forall m. ( MonadLogger m, MonadPostgres m, - MonadUnliftIO m + MonadOtel m ) => Json.Parse ErrorTree TourGroups -> -- | A redacted request that returns a paged result @@ -350,13 +408,15 @@ redactedPagedSearchAndInsert innerParser pagedRequest = do let remainingPages = firstPage.pages - 1 logInfo [fmt|Got the first page, found {remainingPages} more pages|] let otherPagesNum = [(2 :: Natural) .. remainingPages] - let withFirst = do - firstBlock <- Cond.await - Cond.yield (firstBlock & maybe [firstPage] (firstPage :)) - Cond.awaitForever pure + Cond.runConduit @(Transaction m) $ - runConcurrentlyBunched (lbl #batchSize 5) (go . Just <$> otherPagesNum) - .| withFirst + ( do + Cond.yield (singleton firstPage) + case otherPagesNum of + IsNonEmpty o -> runConcurrentlyBunched' (lbl #batchSize 5) (go . Just <$> o) + IsEmpty -> pure () + ) + .| chunkBetween (t2 #low 5 #high 10) .| Cond.mapMC ( \block -> block @@ -386,7 +446,8 @@ redactedPagedSearchAndInsert innerParser pagedRequest = do [T2 "torrentId" Int "fullJsonResult" Json.Value] ) -> Transaction m () - insertTourGroupsAndTorrents dat = do + insertTourGroupsAndTorrents dat = inSpan' "Insert Tour Groups & Torrents" $ \span -> do + addAttribute span "tour_group.length" (dat & lengthNatural, naturalDecimalT) let tourGroups = dat <&> (.tourGroup) let torrents = dat <&> (.torrents) insertTourGroups tourGroups @@ -493,25 +554,97 @@ redactedPagedSearchAndInsert innerParser pagedRequest = do pure () -- | Traverse over the given function in parallel, but only allow a certain amount of concurrent requests. -runConcurrentlyBunched :: +-- Will start new threads as soon as a resource becomes available, but always return results in input ordering. +runConcurrentlyBunched' :: forall m opts a. ( MonadUnliftIO m, HasField "batchSize" opts Natural ) => opts -> -- | list of actions to run - [m a] -> - ConduitT () [a] m () -runConcurrentlyBunched opts acts = do + NonEmpty (m a) -> + ConduitT () (NonEmpty a) m () +runConcurrentlyBunched' opts acts = do let batchSize = assertField (boundedNatural @Int) opts.batchSize - let go :: [m a] -> ConduitT () [a] m () - go [] = pure () - go acts' = do - let (batch, rest) = splitAt batchSize acts' - res <- lift $ mapConcurrentlyTraced id batch - Cond.yield res - go rest - go acts + runInIO <- lift askRunInIO + -- NB: make sure none of the asyncs escape from here + Cond.transPipe (Cond.runResourceT @m) $ do + -- This use of resourceForkWith looks a little off, but it’s the only way to return an `Async a`, I hope it brackets correctly lol + ctx <- Otel.getContext + let spawn :: m a -> Cond.ResourceT m (Async a) + spawn f = resourceForkWith (\io -> async (io >> Otel.attachContext ctx >> runInIO f)) (pure ()) + qsem <- newQSem batchSize + + -- spawn all asyncs here, but limit how many get run consecutively by threading through a semaphore + spawned <- for acts $ \act -> + lift $ spawn $ withQSem qsem $ act + + Cond.yieldMany (spawned & NonEmpty.toList) + .| awaitAllReadyAsyncs + +-- | Consume as many asyncs as are ready and return their results. +-- +-- Make sure they are already running (if you use 'Cond.yieldM' they are only started when awaited by the conduit). +-- +-- If any async throws an exception, the exception will be thrown in the conduit. +-- Already running asyncs will not be cancelled. (TODO: can we somehow make that a thing?) +awaitAllReadyAsyncs :: forall m a. (MonadIO m) => ConduitT (Async a) (NonEmpty a) m () +awaitAllReadyAsyncs = go + where + -- wait for the next async and then consume as many as are already done + go :: ConduitT (Async a) (NonEmpty a) m () + go = do + Cond.await >>= \case + Nothing -> pure () + Just nextAsync -> do + res <- Async.wait nextAsync + -- consume as many asyncs as are already done + goAllReady (RevList.singleton res) + + goAllReady :: RevList a -> ConduitT (Async a) (NonEmpty a) m () + goAllReady !acc = do + next <- Cond.await + case next of + Nothing -> yieldIfNonEmptyRev acc + Just a -> do + thereAlready <- Async.poll a + case thereAlready of + Nothing -> do + -- consumed everything that was available, yield and wait for the next block + yieldIfNonEmptyRev acc + Cond.leftover a + go + Just _ -> do + -- will not block + res <- Async.wait a + goAllReady (acc <> RevList.singleton res) + + yieldIfNonEmptyRev :: RevList a -> ConduitT (Async a) (NonEmpty a) m () + yieldIfNonEmptyRev r = do + case r & RevList.revListToList of + IsNonEmpty e -> Cond.yield e + IsEmpty -> pure () + +testAwaitAllReadyAsyncs :: IO [[Char]] +testAwaitAllReadyAsyncs = + Cond.runConduit $ + ( do + running <- + lift $ + sequence + [ async (print "foo" >> pure 'a'), + async (pure 'b'), + async (threadDelay 5000 >> pure 'c'), + async (print "bar" >> pure '5'), + async (threadDelay 1_000_000 >> print "lol" >> pure 'd'), + async (error "no"), + async (print "baz" >> pure 'f') + ] + Cond.yieldMany running + ) + .| awaitAllReadyAsyncs + .| Cond.mapC toList + .| Cond.sinkList -- | Run the field parser and throw an uncatchable assertion error if it fails. assertField :: (HasCallStack) => FieldParser from to -> from -> to diff --git a/users/Profpatsch/whatcd-resolver/whatcd-resolver.cabal b/users/Profpatsch/whatcd-resolver/whatcd-resolver.cabal index 0f798a443..1d4ee097c 100644 --- a/users/Profpatsch/whatcd-resolver/whatcd-resolver.cabal +++ b/users/Profpatsch/whatcd-resolver/whatcd-resolver.cabal @@ -89,6 +89,7 @@ library pa-run-command, aeson-better-errors, aeson, + async, bencode, blaze-html, bytestring, @@ -109,6 +110,7 @@ library mtl, network-uri, random, + resourcet, resource-pool, template-haskell, postgresql-simple,