diff --git a/users/Profpatsch/whatcd-resolver/src/Redacted.hs b/users/Profpatsch/whatcd-resolver/src/Redacted.hs index 50ad5de33..eeecd84cf 100644 --- a/users/Profpatsch/whatcd-resolver/src/Redacted.hs +++ b/users/Profpatsch/whatcd-resolver/src/Redacted.hs @@ -11,6 +11,7 @@ import Conduit (ConduitT) import Conduit qualified as Cond import Control.Monad.Logger.CallStack import Control.Monad.Reader +import Control.Monad.Trans.Resource (resourceForkWith) import Data.Aeson qualified as Json import Data.Aeson.BetterErrors qualified as Json import Data.Aeson.Key qualified as Key @@ -35,6 +36,7 @@ import MyLabel import MyPrelude import Network.HTTP.Types import Network.Wai.Parse qualified as Wai +import OpenTelemetry.Context.ThreadLocal qualified as Otel import OpenTelemetry.Trace qualified as Otel hiding (getTracer, inSpan, inSpan') import Optional import Parse (Parse, mapLookup, mapLookupMay, runParse) @@ -42,7 +44,12 @@ import Parse qualified import Postgres.Decoder qualified as Dec import Postgres.MonadPostgres import Pretty -import UnliftIO (MonadUnliftIO) +import RevList (RevList) +import RevList qualified +import UnliftIO (MonadUnliftIO, askRunInIO, async, newQSem, withQSem) +import UnliftIO.Async (Async) +import UnliftIO.Async qualified as Async +import UnliftIO.Concurrent (threadDelay) import Prelude hiding (length, span) class MonadRedacted m where @@ -330,11 +337,62 @@ parseTourGroups opts = ) ) +testChunkBetween :: IO [NonEmpty Integer] +testChunkBetween = do + Cond.runConduit $ + ( do + Cond.yield [0] + Cond.yield [1, 2] + Cond.yield [3, 4] + Cond.yield [] + Cond.yield [5, 6] + Cond.yield [7] + ) + .| filterEmpty + .| chunkBetween (t2 #low 2 #high 4) + .| Cond.sinkList + +filterEmpty :: (Monad m) => ConduitT [a] (NonEmpty a) m () +filterEmpty = Cond.awaitForever yieldIfNonEmpty + +-- | Chunk the given stream of lists into chunks that are between @low@ and @high@ (both incl). +-- The last chunk might be shorter than low. +chunkBetween :: + ( HasField "low" p Natural, + HasField "high" p Natural, + Monad m + ) => + p -> + ConduitT (NonEmpty a) (NonEmpty a) m () +chunkBetween opts = do + go mempty + where + low = min opts.low opts.high + high = max opts.low opts.high + high' = assertField (boundedNatural @Int) high + l = lengthNatural + go !acc = do + if l acc >= low + then do + let (c, rest) = List.splitAt high' acc + yieldIfNonEmpty c + go rest + else do + xs <- Cond.await + case xs of + Nothing -> yieldIfNonEmpty acc + Just xs' -> go (acc <> NonEmpty.toList xs') + +yieldIfNonEmpty :: (Monad m) => [a] -> ConduitT i (NonEmpty a) m () +yieldIfNonEmpty = \case + IsNonEmpty xs -> Cond.yield xs + IsEmpty -> pure () + redactedPagedSearchAndInsert :: forall m. ( MonadLogger m, MonadPostgres m, - MonadUnliftIO m + MonadOtel m ) => Json.Parse ErrorTree TourGroups -> -- | A redacted request that returns a paged result @@ -350,13 +408,15 @@ redactedPagedSearchAndInsert innerParser pagedRequest = do let remainingPages = firstPage.pages - 1 logInfo [fmt|Got the first page, found {remainingPages} more pages|] let otherPagesNum = [(2 :: Natural) .. remainingPages] - let withFirst = do - firstBlock <- Cond.await - Cond.yield (firstBlock & maybe [firstPage] (firstPage :)) - Cond.awaitForever pure + Cond.runConduit @(Transaction m) $ - runConcurrentlyBunched (lbl #batchSize 5) (go . Just <$> otherPagesNum) - .| withFirst + ( do + Cond.yield (singleton firstPage) + case otherPagesNum of + IsNonEmpty o -> runConcurrentlyBunched' (lbl #batchSize 5) (go . Just <$> o) + IsEmpty -> pure () + ) + .| chunkBetween (t2 #low 5 #high 10) .| Cond.mapMC ( \block -> block @@ -386,7 +446,8 @@ redactedPagedSearchAndInsert innerParser pagedRequest = do [T2 "torrentId" Int "fullJsonResult" Json.Value] ) -> Transaction m () - insertTourGroupsAndTorrents dat = do + insertTourGroupsAndTorrents dat = inSpan' "Insert Tour Groups & Torrents" $ \span -> do + addAttribute span "tour_group.length" (dat & lengthNatural, naturalDecimalT) let tourGroups = dat <&> (.tourGroup) let torrents = dat <&> (.torrents) insertTourGroups tourGroups @@ -493,25 +554,97 @@ redactedPagedSearchAndInsert innerParser pagedRequest = do pure () -- | Traverse over the given function in parallel, but only allow a certain amount of concurrent requests. -runConcurrentlyBunched :: +-- Will start new threads as soon as a resource becomes available, but always return results in input ordering. +runConcurrentlyBunched' :: forall m opts a. ( MonadUnliftIO m, HasField "batchSize" opts Natural ) => opts -> -- | list of actions to run - [m a] -> - ConduitT () [a] m () -runConcurrentlyBunched opts acts = do + NonEmpty (m a) -> + ConduitT () (NonEmpty a) m () +runConcurrentlyBunched' opts acts = do let batchSize = assertField (boundedNatural @Int) opts.batchSize - let go :: [m a] -> ConduitT () [a] m () - go [] = pure () - go acts' = do - let (batch, rest) = splitAt batchSize acts' - res <- lift $ mapConcurrentlyTraced id batch - Cond.yield res - go rest - go acts + runInIO <- lift askRunInIO + -- NB: make sure none of the asyncs escape from here + Cond.transPipe (Cond.runResourceT @m) $ do + -- This use of resourceForkWith looks a little off, but it’s the only way to return an `Async a`, I hope it brackets correctly lol + ctx <- Otel.getContext + let spawn :: m a -> Cond.ResourceT m (Async a) + spawn f = resourceForkWith (\io -> async (io >> Otel.attachContext ctx >> runInIO f)) (pure ()) + qsem <- newQSem batchSize + + -- spawn all asyncs here, but limit how many get run consecutively by threading through a semaphore + spawned <- for acts $ \act -> + lift $ spawn $ withQSem qsem $ act + + Cond.yieldMany (spawned & NonEmpty.toList) + .| awaitAllReadyAsyncs + +-- | Consume as many asyncs as are ready and return their results. +-- +-- Make sure they are already running (if you use 'Cond.yieldM' they are only started when awaited by the conduit). +-- +-- If any async throws an exception, the exception will be thrown in the conduit. +-- Already running asyncs will not be cancelled. (TODO: can we somehow make that a thing?) +awaitAllReadyAsyncs :: forall m a. (MonadIO m) => ConduitT (Async a) (NonEmpty a) m () +awaitAllReadyAsyncs = go + where + -- wait for the next async and then consume as many as are already done + go :: ConduitT (Async a) (NonEmpty a) m () + go = do + Cond.await >>= \case + Nothing -> pure () + Just nextAsync -> do + res <- Async.wait nextAsync + -- consume as many asyncs as are already done + goAllReady (RevList.singleton res) + + goAllReady :: RevList a -> ConduitT (Async a) (NonEmpty a) m () + goAllReady !acc = do + next <- Cond.await + case next of + Nothing -> yieldIfNonEmptyRev acc + Just a -> do + thereAlready <- Async.poll a + case thereAlready of + Nothing -> do + -- consumed everything that was available, yield and wait for the next block + yieldIfNonEmptyRev acc + Cond.leftover a + go + Just _ -> do + -- will not block + res <- Async.wait a + goAllReady (acc <> RevList.singleton res) + + yieldIfNonEmptyRev :: RevList a -> ConduitT (Async a) (NonEmpty a) m () + yieldIfNonEmptyRev r = do + case r & RevList.revListToList of + IsNonEmpty e -> Cond.yield e + IsEmpty -> pure () + +testAwaitAllReadyAsyncs :: IO [[Char]] +testAwaitAllReadyAsyncs = + Cond.runConduit $ + ( do + running <- + lift $ + sequence + [ async (print "foo" >> pure 'a'), + async (pure 'b'), + async (threadDelay 5000 >> pure 'c'), + async (print "bar" >> pure '5'), + async (threadDelay 1_000_000 >> print "lol" >> pure 'd'), + async (error "no"), + async (print "baz" >> pure 'f') + ] + Cond.yieldMany running + ) + .| awaitAllReadyAsyncs + .| Cond.mapC toList + .| Cond.sinkList -- | Run the field parser and throw an uncatchable assertion error if it fails. assertField :: (HasCallStack) => FieldParser from to -> from -> to diff --git a/users/Profpatsch/whatcd-resolver/whatcd-resolver.cabal b/users/Profpatsch/whatcd-resolver/whatcd-resolver.cabal index 0f798a443..1d4ee097c 100644 --- a/users/Profpatsch/whatcd-resolver/whatcd-resolver.cabal +++ b/users/Profpatsch/whatcd-resolver/whatcd-resolver.cabal @@ -89,6 +89,7 @@ library pa-run-command, aeson-better-errors, aeson, + async, bencode, blaze-html, bytestring, @@ -109,6 +110,7 @@ library mtl, network-uri, random, + resourcet, resource-pool, template-haskell, postgresql-simple,