From ae0e75aaf2c3ed81742b30f1f42c081cb01654cd Mon Sep 17 00:00:00 2001 From: Profpatsch Date: Tue, 11 Mar 2025 13:10:28 +0100 Subject: [PATCH] feat(users/Profpatsch/whatcd-resolver): parallelize search pages This bunches up 5 search page requests to run at the same time. We use a conduit now, so we could get smart about returning partial results and such (if for example upstream puts us into the rate limit, which they do after 10 requests. Change-Id: Idbb174334fa499c16b3426a8d129deaf3a1d3b0b Reviewed-on: https://cl.tvl.fyi/c/depot/+/13245 Tested-by: BuildkiteCI Reviewed-by: Profpatsch --- users/Profpatsch/my-prelude/src/Json.hs | 35 +++-- users/Profpatsch/whatcd-resolver/src/AppT.hs | 37 +++++ .../whatcd-resolver/src/Redacted.hs | 132 ++++++++++++------ .../whatcd-resolver/src/WhatcdResolver.hs | 42 +----- 4 files changed, 156 insertions(+), 90 deletions(-) diff --git a/users/Profpatsch/my-prelude/src/Json.hs b/users/Profpatsch/my-prelude/src/Json.hs index 5fbc57eff..808395163 100644 --- a/users/Profpatsch/my-prelude/src/Json.hs +++ b/users/Profpatsch/my-prelude/src/Json.hs @@ -213,11 +213,11 @@ asUtcTimeLenient = Field.toJsonParser (Field.jsonString >>> Field.utcTimeLenient -- We don’t provide a version that infers the json object key, -- since that conflates internal naming with the external API, which is dangerous. -- --- @@ +-- @ -- do -- txt <- keyLabel @"myLabel" "jsonKeyName" Json.asText -- pure (txt :: Label "myLabel" Text) --- @@ +-- @ keyLabel :: forall label err m a. (Monad m) => @@ -230,11 +230,11 @@ keyLabel = do -- | Parse a key from the object, à la 'Json.key', return a labelled value. -- Version of 'keyLabel' that requires a proxy. -- --- @@ +-- @ -- do -- txt <- keyLabel' (Proxy @"myLabel") "jsonKeyName" Json.asText -- pure (txt :: Label "myLabel" Text) --- @@ +-- @ keyLabel' :: forall label err m a. (Monad m) => @@ -249,11 +249,11 @@ keyLabel' Proxy key parser = label @label <$> Json.key key parser -- We don’t provide a version that infers the json object key, -- since that conflates internal naming with the external API, which is dangerous. -- --- @@ +-- @ -- do -- txt <- keyLabelMay @"myLabel" "jsonKeyName" Json.asText -- pure (txt :: Label "myLabel" (Maybe Text)) --- @@ +-- @ keyLabelMay :: forall label err m a. (Monad m) => @@ -263,14 +263,33 @@ keyLabelMay :: keyLabelMay = do keyLabelMay' (Proxy @label) +-- | Parse an optional key from the object. The inner parser’s return value has to be a Monoid, +-- and we collapse the missing key into its 'mempty'. +-- +-- For example, if the inner parser returns a list, the missing key will be parsed as an empty list. +-- +-- @ +-- do +-- txt <- keyMay' "jsonKeyName" (Json.eachInArray Json.asText) +-- pure (txt :: [Text]) +-- @ +-- +-- will return @[]@ if the key is missing or if the value is the empty array. +keyMayMempty :: + (Monad m, Monoid a) => + Text -> + Json.ParseT err m a -> + Json.ParseT err m a +keyMayMempty key parser = Json.keyMay key parser <&> fromMaybe mempty + -- | Parse an optional key from the object, à la 'Json.keyMay', return a labelled value. -- Version of 'keyLabelMay' that requires a proxy. -- --- @@ +-- @ -- do -- txt <- keyLabelMay' (Proxy @"myLabel") "jsonKeyName" Json.asText -- pure (txt :: Label "myLabel" (Maybe Text)) --- @@ +-- @ keyLabelMay' :: forall label err m a. (Monad m) => diff --git a/users/Profpatsch/whatcd-resolver/src/AppT.hs b/users/Profpatsch/whatcd-resolver/src/AppT.hs index 87e88dac5..5397f74d3 100644 --- a/users/Profpatsch/whatcd-resolver/src/AppT.hs +++ b/users/Profpatsch/whatcd-resolver/src/AppT.hs @@ -25,6 +25,7 @@ import Json.Enc import Json.Enc qualified as Enc import Label import MyPrelude +import OpenTelemetry.Context.ThreadLocal qualified as Otel import OpenTelemetry.Trace qualified as Otel hiding (getTracer, inSpan, inSpan') import OpenTelemetry.Trace.Core qualified as Otel hiding (inSpan, inSpan') import OpenTelemetry.Trace.Monad qualified as Otel @@ -190,6 +191,42 @@ recordException span dat = liftIO $ do .. } +-- * Async wrappers with Otel tracing + +withAsyncTraced :: (MonadUnliftIO m) => m a -> (Async a -> m b) -> m b +withAsyncTraced act f = do + ctx <- Otel.getContext + withAsync + ( do + _old <- Otel.attachContext ctx + act + ) + f + +-- | Run two actions concurrently, and add them to the current Otel trace +concurrentlyTraced :: (MonadUnliftIO m) => m a -> m b -> m (a, b) +concurrentlyTraced act1 act2 = do + ctx <- Otel.getContext + concurrently + ( do + _old <- Otel.attachContext ctx + act1 + ) + ( do + _old <- Otel.attachContext ctx + act2 + ) + +mapConcurrentlyTraced :: (MonadUnliftIO m, Traversable t) => (a -> m b) -> t a -> m (t b) +mapConcurrentlyTraced f t = do + ctx <- Otel.getContext + mapConcurrently + ( \a -> do + _old <- Otel.attachContext ctx + f a + ) + t + -- * Postgres instance (MonadThrow m, MonadUnliftIO m) => MonadPostgres (AppT m) where diff --git a/users/Profpatsch/whatcd-resolver/src/Redacted.hs b/users/Profpatsch/whatcd-resolver/src/Redacted.hs index 422f5c89b..50ad5de33 100644 --- a/users/Profpatsch/whatcd-resolver/src/Redacted.hs +++ b/users/Profpatsch/whatcd-resolver/src/Redacted.hs @@ -7,6 +7,8 @@ import Arg import Bencode import Builder import Comparison +import Conduit (ConduitT) +import Conduit qualified as Cond import Control.Monad.Logger.CallStack import Control.Monad.Reader import Data.Aeson qualified as Json @@ -14,6 +16,7 @@ import Data.Aeson.BetterErrors qualified as Json import Data.Aeson.Key qualified as Key import Data.Aeson.KeyMap qualified as KeyMap import Data.BEncode (BEncode) +import Data.Conduit ((.|)) import Data.Error.Tree import Data.List qualified as List import Data.List.NonEmpty qualified as NonEmpty @@ -23,6 +26,7 @@ import Data.Time (NominalDiffTime, UTCTime) import Data.Time.Clock.POSIX (posixSecondsToUTCTime) import Database.PostgreSQL.Simple (Binary (Binary), Only (..)) import Database.PostgreSQL.Simple.Types (PGArray (PGArray)) +import FieldParser (FieldParser) import FieldParser qualified as Field import Http qualified import Json qualified @@ -38,6 +42,7 @@ import Parse qualified import Postgres.Decoder qualified as Dec import Postgres.MonadPostgres import Pretty +import UnliftIO (MonadUnliftIO) import Prelude hiding (length, span) class MonadRedacted m where @@ -155,9 +160,9 @@ redactedGetTorrentFile dat = inSpan' "Redacted Get Torrent File" $ \span -> do mkRedactedTorrentLink :: Arg "torrentGroupId" Int -> Text mkRedactedTorrentLink torrentId = [fmt|https://redacted.sh/torrents.php?id={torrentId.unArg}|] -exampleSearch :: (MonadThrow m, MonadLogger m, MonadPostgres m, MonadOtel m, MonadRedacted m) => m (Transaction m ()) +exampleSearch :: (MonadThrow m, MonadLogger m, MonadPostgres m, MonadOtel m, MonadRedacted m) => Transaction m () exampleSearch = do - x1 <- + _x1 <- redactedSearchAndInsert [ ("searchstr", "cherish"), ("artistname", "kirinji"), @@ -166,7 +171,7 @@ exampleSearch = do -- ("releasetype", "album"), ("order_by", "year") ] - x3 <- + _x3 <- redactedSearchAndInsert [ ("searchstr", "mouss et hakim"), ("artistname", "mouss et hakim"), @@ -175,7 +180,7 @@ exampleSearch = do -- ("releasetype", "album"), ("order_by", "year") ] - x2 <- + _x2 <- redactedSearchAndInsert [ ("searchstr", "thriller"), ("artistname", "michael jackson"), @@ -184,7 +189,7 @@ exampleSearch = do -- ("releasetype", "album"), ("order_by", "year") ] - pure (x1 >> x2 >> x3 >> pure ()) + pure () redactedRefreshArtist :: ( MonadLogger m, @@ -195,7 +200,7 @@ redactedRefreshArtist :: HasField "artistId" dat Int ) => dat -> - m (Transaction m (Label "newTorrents" [Label "torrentId" Int])) + Transaction m (Label "newTorrents" [Label "torrentId" Int]) redactedRefreshArtist dat = do redactedPagedSearchAndInsert ( Json.key "torrentgroup" $ @@ -219,7 +224,7 @@ redactedRefreshArtist dat = do redactedSearchAndInsert :: (MonadLogger m, MonadPostgres m, MonadThrow m, MonadOtel m, MonadRedacted m) => [(ByteString, ByteString)] -> - m (Transaction m (Label "newTorrents" [Label "torrentId" Int])) + Transaction m (Label "newTorrents" [Label "torrentId" Int]) redactedSearchAndInsert extraArguments = redactedPagedSearchAndInsert (Json.key "results" $ parseTourGroups (T2 (label @"torrentFieldName" "torrents") (label @"torrentIdName" "torrentId"))) @@ -260,12 +265,20 @@ type TourGroups = "tourGroups" [ T2 "tourGroup" - (T3 "groupId" Int "groupName" Text "fullJsonResult" Json.Value) + TourGroup "torrents" [T2 "torrentId" Int "fullJsonResult" Json.Value] ] ) +data TourGroup = TourGroup + { groupId :: Int, + groupName :: Text, + fullJsonResult :: Json.Value, + -- | Needed for sm0rt request recursion + groupArtists :: [Label "artistId" Int] + } + parseTourGroups :: ( Monad m, HasField "torrentFieldName" opts Text, @@ -282,16 +295,18 @@ parseTourGroups opts = -- not a torrent group, maybe some files or something (e.g. guitar tabs see Dream Theater Systematic Chaos) Nothing -> pure Nothing Just () -> do - groupId <- Json.keyLabel @"groupId" "groupId" (Json.asIntegral @_ @Int) - groupName <- Json.keyLabel @"groupName" "groupName" Json.asText + groupId <- Json.key "groupId" (Json.asIntegral @_ @Int) + groupName <- Json.key "groupName" Json.asText + groupArtists <- + Json.keyMayMempty "artists" $ + Json.eachInArray $ + lbl #artistId <$> Json.key "id" (Json.asIntegral @_ @Int) fullJsonResult <- - label @"fullJsonResult" - <$> ( Json.asObject - -- remove torrents cause they are inserted separately below - <&> KeyMap.filterWithKey (\k _ -> k /= (opts.torrentFieldName & Key.fromText)) - <&> Json.Object - ) - let tourGroup = T3 groupId groupName fullJsonResult + Json.asObject + -- remove torrents cause they are inserted separately below + <&> KeyMap.filterWithKey (\k _ -> k /= (opts.torrentFieldName & Key.fromText)) + <&> Json.Object + let tourGroup = TourGroup {..} torrents <- Json.keyLabel @"torrents" opts.torrentFieldName $ Json.eachInArray $ do torrentId <- Json.keyLabel @"torrentId" opts.torrentIdName (Json.asIntegral @_ @Int) @@ -318,7 +333,8 @@ parseTourGroups opts = redactedPagedSearchAndInsert :: forall m. ( MonadLogger m, - MonadPostgres m + MonadPostgres m, + MonadUnliftIO m ) => Json.Parse ErrorTree TourGroups -> -- | A redacted request that returns a paged result @@ -327,33 +343,45 @@ redactedPagedSearchAndInsert :: Json.Parse ErrorTree a -> m a ) -> - m (Transaction m (Label "newTorrents" [Label "torrentId" Int])) + Transaction m (Label "newTorrents" [Label "torrentId" Int]) redactedPagedSearchAndInsert innerParser pagedRequest = do -- The first search returns the amount of pages, so we use that to query all results piece by piece. firstPage <- go Nothing let remainingPages = firstPage.pages - 1 logInfo [fmt|Got the first page, found {remainingPages} more pages|] let otherPagesNum = [(2 :: Natural) .. remainingPages] - otherPages <- traverse go (Just <$> otherPagesNum) - pure $ - (firstPage : otherPages) - & concatMap (.response.tourGroups) - & \case - IsNonEmpty tgs -> do - tgs & insertTourGroupsAndTorrents - pure $ label @"newTorrents" (tgs & concatMap (\tg -> tg.torrents <&> getLabel @"torrentId")) - IsEmpty -> pure $ label @"newTorrents" [] + let withFirst = do + firstBlock <- Cond.await + Cond.yield (firstBlock & maybe [firstPage] (firstPage :)) + Cond.awaitForever pure + Cond.runConduit @(Transaction m) $ + runConcurrentlyBunched (lbl #batchSize 5) (go . Just <$> otherPagesNum) + .| withFirst + .| Cond.mapMC + ( \block -> + block + & concatMap (.response.tourGroups) + & \case + IsNonEmpty tgs -> do + tgs & insertTourGroupsAndTorrents + pure $ tgs & concatMap (\tg -> tg.torrents <&> getLabel @"torrentId") + IsEmpty -> pure [] + ) + .| Cond.concatC + .| Cond.sinkList + <&> label @"newTorrents" where go mpage = - pagedRequest - (label @"page" mpage) - ( parseRedactedReplyStatus $ innerParser - ) + lift @Transaction $ + pagedRequest + (label @"page" mpage) + ( parseRedactedReplyStatus $ innerParser + ) insertTourGroupsAndTorrents :: NonEmpty ( T2 "tourGroup" - (T3 "groupId" Int "groupName" Text "fullJsonResult" Json.Value) + TourGroup "torrents" [T2 "torrentId" Int "fullJsonResult" Json.Value] ) -> @@ -370,15 +398,7 @@ redactedPagedSearchAndInsert innerParser pagedRequest = do (label @"torrents" (torrents & toList)) ) insertTourGroups :: - NonEmpty - ( T3 - "groupId" - Int - "groupName" - Text - "fullJsonResult" - Json.Value - ) -> + NonEmpty TourGroup -> Transaction m [Label "tourGroupIdPg" Int] insertTourGroups dats = do let groupNames = @@ -472,6 +492,34 @@ redactedPagedSearchAndInsert innerParser pagedRequest = do ) pure () +-- | Traverse over the given function in parallel, but only allow a certain amount of concurrent requests. +runConcurrentlyBunched :: + forall m opts a. + ( MonadUnliftIO m, + HasField "batchSize" opts Natural + ) => + opts -> + -- | list of actions to run + [m a] -> + ConduitT () [a] m () +runConcurrentlyBunched opts acts = do + let batchSize = assertField (boundedNatural @Int) opts.batchSize + let go :: [m a] -> ConduitT () [a] m () + go [] = pure () + go acts' = do + let (batch, rest) = splitAt batchSize acts' + res <- lift $ mapConcurrentlyTraced id batch + Cond.yield res + go rest + go acts + +-- | Run the field parser and throw an uncatchable assertion error if it fails. +assertField :: (HasCallStack) => FieldParser from to -> from -> to +assertField parser from = Field.runFieldParser parser from & unwrapError + +boundedNatural :: forall i. (Integral i, Bounded i) => FieldParser Natural i +boundedNatural = lmap naturalToInteger (Field.bounded @i "boundedNatural") + redactedGetTorrentFileAndInsert :: ( HasField "torrentId" r Int, HasField "useFreeleechTokens" r Bool, diff --git a/users/Profpatsch/whatcd-resolver/src/WhatcdResolver.hs b/users/Profpatsch/whatcd-resolver/src/WhatcdResolver.hs index 77f031cfa..3f227549e 100644 --- a/users/Profpatsch/whatcd-resolver/src/WhatcdResolver.hs +++ b/users/Profpatsch/whatcd-resolver/src/WhatcdResolver.hs @@ -53,7 +53,6 @@ import Network.Wai qualified as Wai import Network.Wai.Handler.Warp qualified as Warp import Network.Wai.Parse (parseContentType) import OpenTelemetry.Attributes qualified as Otel -import OpenTelemetry.Context.ThreadLocal qualified as Otel import OpenTelemetry.Trace qualified as Otel hiding (getTracer, inSpan, inSpan') import OpenTelemetry.Trace.Monad qualified as Otel import Parse (Parse, showContext) @@ -148,9 +147,8 @@ htmlUi = do \dat _span -> ( pure $ htmlPageChrome ourHtmlIntegrities [fmt|whatcd-resolver – Search – {dat.queryArgs.searchstr & bytesToTextUtf8Lenient}|], do - t <- redactedSearchAndInsert [("searchstr", dat.queryArgs.searchstr)] runTransaction $ do - res <- t + res <- redactedSearchAndInsert [("searchstr", dat.queryArgs.searchstr)] (table, settings) <- concurrentlyTraced ( do @@ -336,9 +334,7 @@ htmlUi = do >>> (Field.bounded @Int "Int") ) ) - t <- redactedRefreshArtist dat - runTransaction $ do - t + runTransaction $ redactedRefreshArtist dat pure $ E22 (label @"redirectTo" $ textToBytesUtf8 $ mkArtistLink dat) ), ( "autorefresh", @@ -459,40 +455,6 @@ mainHtml' dat = do /> --> |] -withAsyncTraced :: (MonadUnliftIO m) => m a -> (Async a -> m b) -> m b -withAsyncTraced act f = do - ctx <- Otel.getContext - withAsync - ( do - _old <- Otel.attachContext ctx - act - ) - f - --- | Run two actions concurrently, and add them to the current Otel trace -concurrentlyTraced :: (MonadUnliftIO m) => m a -> m b -> m (a, b) -concurrentlyTraced act1 act2 = do - ctx <- Otel.getContext - concurrently - ( do - _old <- Otel.attachContext ctx - act1 - ) - ( do - _old <- Otel.attachContext ctx - act2 - ) - -mapConcurrentlyTraced :: (MonadUnliftIO m, Traversable t) => (a -> m b) -> t a -> m (t b) -mapConcurrentlyTraced f t = do - ctx <- Otel.getContext - mapConcurrently - ( \a -> do - _old <- Otel.attachContext ctx - f a - ) - t - parseMultipartOrThrow :: (MonadLogger m, MonadIO m, MonadThrow m) => Otel.Span -> Wai.Request -> Multipart.MultipartParseT m a -> m a parseMultipartOrThrow span req parser = Multipart.parseMultipartOrThrow