feat(users/Profpatsch/whatcd-resolver): parallelize search pages

This bunches up 5 search page requests to run at the same time.
We use a conduit now, so we could get smart about returning partial
results and such (if for example upstream puts us into the rate limit,
which they do after 10 requests.

Change-Id: Idbb174334fa499c16b3426a8d129deaf3a1d3b0b
Reviewed-on: https://cl.tvl.fyi/c/depot/+/13245
Tested-by: BuildkiteCI
Reviewed-by: Profpatsch <mail@profpatsch.de>
This commit is contained in:
Profpatsch 2025-03-11 13:10:28 +01:00
parent ca6c5ac59e
commit ae0e75aaf2
4 changed files with 156 additions and 90 deletions

View file

@ -213,11 +213,11 @@ asUtcTimeLenient = Field.toJsonParser (Field.jsonString >>> Field.utcTimeLenient
-- We dont provide a version that infers the json object key,
-- since that conflates internal naming with the external API, which is dangerous.
--
-- @@
-- @
-- do
-- txt <- keyLabel @"myLabel" "jsonKeyName" Json.asText
-- pure (txt :: Label "myLabel" Text)
-- @@
-- @
keyLabel ::
forall label err m a.
(Monad m) =>
@ -230,11 +230,11 @@ keyLabel = do
-- | Parse a key from the object, à la 'Json.key', return a labelled value.
-- Version of 'keyLabel' that requires a proxy.
--
-- @@
-- @
-- do
-- txt <- keyLabel' (Proxy @"myLabel") "jsonKeyName" Json.asText
-- pure (txt :: Label "myLabel" Text)
-- @@
-- @
keyLabel' ::
forall label err m a.
(Monad m) =>
@ -249,11 +249,11 @@ keyLabel' Proxy key parser = label @label <$> Json.key key parser
-- We dont provide a version that infers the json object key,
-- since that conflates internal naming with the external API, which is dangerous.
--
-- @@
-- @
-- do
-- txt <- keyLabelMay @"myLabel" "jsonKeyName" Json.asText
-- pure (txt :: Label "myLabel" (Maybe Text))
-- @@
-- @
keyLabelMay ::
forall label err m a.
(Monad m) =>
@ -263,14 +263,33 @@ keyLabelMay ::
keyLabelMay = do
keyLabelMay' (Proxy @label)
-- | Parse an optional key from the object. The inner parsers return value has to be a Monoid,
-- and we collapse the missing key into its 'mempty'.
--
-- For example, if the inner parser returns a list, the missing key will be parsed as an empty list.
--
-- @
-- do
-- txt <- keyMay' "jsonKeyName" (Json.eachInArray Json.asText)
-- pure (txt :: [Text])
-- @
--
-- will return @[]@ if the key is missing or if the value is the empty array.
keyMayMempty ::
(Monad m, Monoid a) =>
Text ->
Json.ParseT err m a ->
Json.ParseT err m a
keyMayMempty key parser = Json.keyMay key parser <&> fromMaybe mempty
-- | Parse an optional key from the object, à la 'Json.keyMay', return a labelled value.
-- Version of 'keyLabelMay' that requires a proxy.
--
-- @@
-- @
-- do
-- txt <- keyLabelMay' (Proxy @"myLabel") "jsonKeyName" Json.asText
-- pure (txt :: Label "myLabel" (Maybe Text))
-- @@
-- @
keyLabelMay' ::
forall label err m a.
(Monad m) =>

View file

@ -25,6 +25,7 @@ import Json.Enc
import Json.Enc qualified as Enc
import Label
import MyPrelude
import OpenTelemetry.Context.ThreadLocal qualified as Otel
import OpenTelemetry.Trace qualified as Otel hiding (getTracer, inSpan, inSpan')
import OpenTelemetry.Trace.Core qualified as Otel hiding (inSpan, inSpan')
import OpenTelemetry.Trace.Monad qualified as Otel
@ -190,6 +191,42 @@ recordException span dat = liftIO $ do
..
}
-- * Async wrappers with Otel tracing
withAsyncTraced :: (MonadUnliftIO m) => m a -> (Async a -> m b) -> m b
withAsyncTraced act f = do
ctx <- Otel.getContext
withAsync
( do
_old <- Otel.attachContext ctx
act
)
f
-- | Run two actions concurrently, and add them to the current Otel trace
concurrentlyTraced :: (MonadUnliftIO m) => m a -> m b -> m (a, b)
concurrentlyTraced act1 act2 = do
ctx <- Otel.getContext
concurrently
( do
_old <- Otel.attachContext ctx
act1
)
( do
_old <- Otel.attachContext ctx
act2
)
mapConcurrentlyTraced :: (MonadUnliftIO m, Traversable t) => (a -> m b) -> t a -> m (t b)
mapConcurrentlyTraced f t = do
ctx <- Otel.getContext
mapConcurrently
( \a -> do
_old <- Otel.attachContext ctx
f a
)
t
-- * Postgres
instance (MonadThrow m, MonadUnliftIO m) => MonadPostgres (AppT m) where

View file

@ -7,6 +7,8 @@ import Arg
import Bencode
import Builder
import Comparison
import Conduit (ConduitT)
import Conduit qualified as Cond
import Control.Monad.Logger.CallStack
import Control.Monad.Reader
import Data.Aeson qualified as Json
@ -14,6 +16,7 @@ import Data.Aeson.BetterErrors qualified as Json
import Data.Aeson.Key qualified as Key
import Data.Aeson.KeyMap qualified as KeyMap
import Data.BEncode (BEncode)
import Data.Conduit ((.|))
import Data.Error.Tree
import Data.List qualified as List
import Data.List.NonEmpty qualified as NonEmpty
@ -23,6 +26,7 @@ import Data.Time (NominalDiffTime, UTCTime)
import Data.Time.Clock.POSIX (posixSecondsToUTCTime)
import Database.PostgreSQL.Simple (Binary (Binary), Only (..))
import Database.PostgreSQL.Simple.Types (PGArray (PGArray))
import FieldParser (FieldParser)
import FieldParser qualified as Field
import Http qualified
import Json qualified
@ -38,6 +42,7 @@ import Parse qualified
import Postgres.Decoder qualified as Dec
import Postgres.MonadPostgres
import Pretty
import UnliftIO (MonadUnliftIO)
import Prelude hiding (length, span)
class MonadRedacted m where
@ -155,9 +160,9 @@ redactedGetTorrentFile dat = inSpan' "Redacted Get Torrent File" $ \span -> do
mkRedactedTorrentLink :: Arg "torrentGroupId" Int -> Text
mkRedactedTorrentLink torrentId = [fmt|https://redacted.sh/torrents.php?id={torrentId.unArg}|]
exampleSearch :: (MonadThrow m, MonadLogger m, MonadPostgres m, MonadOtel m, MonadRedacted m) => m (Transaction m ())
exampleSearch :: (MonadThrow m, MonadLogger m, MonadPostgres m, MonadOtel m, MonadRedacted m) => Transaction m ()
exampleSearch = do
x1 <-
_x1 <-
redactedSearchAndInsert
[ ("searchstr", "cherish"),
("artistname", "kirinji"),
@ -166,7 +171,7 @@ exampleSearch = do
-- ("releasetype", "album"),
("order_by", "year")
]
x3 <-
_x3 <-
redactedSearchAndInsert
[ ("searchstr", "mouss et hakim"),
("artistname", "mouss et hakim"),
@ -175,7 +180,7 @@ exampleSearch = do
-- ("releasetype", "album"),
("order_by", "year")
]
x2 <-
_x2 <-
redactedSearchAndInsert
[ ("searchstr", "thriller"),
("artistname", "michael jackson"),
@ -184,7 +189,7 @@ exampleSearch = do
-- ("releasetype", "album"),
("order_by", "year")
]
pure (x1 >> x2 >> x3 >> pure ())
pure ()
redactedRefreshArtist ::
( MonadLogger m,
@ -195,7 +200,7 @@ redactedRefreshArtist ::
HasField "artistId" dat Int
) =>
dat ->
m (Transaction m (Label "newTorrents" [Label "torrentId" Int]))
Transaction m (Label "newTorrents" [Label "torrentId" Int])
redactedRefreshArtist dat = do
redactedPagedSearchAndInsert
( Json.key "torrentgroup" $
@ -219,7 +224,7 @@ redactedRefreshArtist dat = do
redactedSearchAndInsert ::
(MonadLogger m, MonadPostgres m, MonadThrow m, MonadOtel m, MonadRedacted m) =>
[(ByteString, ByteString)] ->
m (Transaction m (Label "newTorrents" [Label "torrentId" Int]))
Transaction m (Label "newTorrents" [Label "torrentId" Int])
redactedSearchAndInsert extraArguments =
redactedPagedSearchAndInsert
(Json.key "results" $ parseTourGroups (T2 (label @"torrentFieldName" "torrents") (label @"torrentIdName" "torrentId")))
@ -260,12 +265,20 @@ type TourGroups =
"tourGroups"
[ T2
"tourGroup"
(T3 "groupId" Int "groupName" Text "fullJsonResult" Json.Value)
TourGroup
"torrents"
[T2 "torrentId" Int "fullJsonResult" Json.Value]
]
)
data TourGroup = TourGroup
{ groupId :: Int,
groupName :: Text,
fullJsonResult :: Json.Value,
-- | Needed for sm0rt request recursion
groupArtists :: [Label "artistId" Int]
}
parseTourGroups ::
( Monad m,
HasField "torrentFieldName" opts Text,
@ -282,16 +295,18 @@ parseTourGroups opts =
-- not a torrent group, maybe some files or something (e.g. guitar tabs see Dream Theater Systematic Chaos)
Nothing -> pure Nothing
Just () -> do
groupId <- Json.keyLabel @"groupId" "groupId" (Json.asIntegral @_ @Int)
groupName <- Json.keyLabel @"groupName" "groupName" Json.asText
groupId <- Json.key "groupId" (Json.asIntegral @_ @Int)
groupName <- Json.key "groupName" Json.asText
groupArtists <-
Json.keyMayMempty "artists" $
Json.eachInArray $
lbl #artistId <$> Json.key "id" (Json.asIntegral @_ @Int)
fullJsonResult <-
label @"fullJsonResult"
<$> ( Json.asObject
-- remove torrents cause they are inserted separately below
<&> KeyMap.filterWithKey (\k _ -> k /= (opts.torrentFieldName & Key.fromText))
<&> Json.Object
)
let tourGroup = T3 groupId groupName fullJsonResult
Json.asObject
-- remove torrents cause they are inserted separately below
<&> KeyMap.filterWithKey (\k _ -> k /= (opts.torrentFieldName & Key.fromText))
<&> Json.Object
let tourGroup = TourGroup {..}
torrents <- Json.keyLabel @"torrents" opts.torrentFieldName $
Json.eachInArray $ do
torrentId <- Json.keyLabel @"torrentId" opts.torrentIdName (Json.asIntegral @_ @Int)
@ -318,7 +333,8 @@ parseTourGroups opts =
redactedPagedSearchAndInsert ::
forall m.
( MonadLogger m,
MonadPostgres m
MonadPostgres m,
MonadUnliftIO m
) =>
Json.Parse ErrorTree TourGroups ->
-- | A redacted request that returns a paged result
@ -327,33 +343,45 @@ redactedPagedSearchAndInsert ::
Json.Parse ErrorTree a ->
m a
) ->
m (Transaction m (Label "newTorrents" [Label "torrentId" Int]))
Transaction m (Label "newTorrents" [Label "torrentId" Int])
redactedPagedSearchAndInsert innerParser pagedRequest = do
-- The first search returns the amount of pages, so we use that to query all results piece by piece.
firstPage <- go Nothing
let remainingPages = firstPage.pages - 1
logInfo [fmt|Got the first page, found {remainingPages} more pages|]
let otherPagesNum = [(2 :: Natural) .. remainingPages]
otherPages <- traverse go (Just <$> otherPagesNum)
pure $
(firstPage : otherPages)
& concatMap (.response.tourGroups)
& \case
IsNonEmpty tgs -> do
tgs & insertTourGroupsAndTorrents
pure $ label @"newTorrents" (tgs & concatMap (\tg -> tg.torrents <&> getLabel @"torrentId"))
IsEmpty -> pure $ label @"newTorrents" []
let withFirst = do
firstBlock <- Cond.await
Cond.yield (firstBlock & maybe [firstPage] (firstPage :))
Cond.awaitForever pure
Cond.runConduit @(Transaction m) $
runConcurrentlyBunched (lbl #batchSize 5) (go . Just <$> otherPagesNum)
.| withFirst
.| Cond.mapMC
( \block ->
block
& concatMap (.response.tourGroups)
& \case
IsNonEmpty tgs -> do
tgs & insertTourGroupsAndTorrents
pure $ tgs & concatMap (\tg -> tg.torrents <&> getLabel @"torrentId")
IsEmpty -> pure []
)
.| Cond.concatC
.| Cond.sinkList
<&> label @"newTorrents"
where
go mpage =
pagedRequest
(label @"page" mpage)
( parseRedactedReplyStatus $ innerParser
)
lift @Transaction $
pagedRequest
(label @"page" mpage)
( parseRedactedReplyStatus $ innerParser
)
insertTourGroupsAndTorrents ::
NonEmpty
( T2
"tourGroup"
(T3 "groupId" Int "groupName" Text "fullJsonResult" Json.Value)
TourGroup
"torrents"
[T2 "torrentId" Int "fullJsonResult" Json.Value]
) ->
@ -370,15 +398,7 @@ redactedPagedSearchAndInsert innerParser pagedRequest = do
(label @"torrents" (torrents & toList))
)
insertTourGroups ::
NonEmpty
( T3
"groupId"
Int
"groupName"
Text
"fullJsonResult"
Json.Value
) ->
NonEmpty TourGroup ->
Transaction m [Label "tourGroupIdPg" Int]
insertTourGroups dats = do
let groupNames =
@ -472,6 +492,34 @@ redactedPagedSearchAndInsert innerParser pagedRequest = do
)
pure ()
-- | Traverse over the given function in parallel, but only allow a certain amount of concurrent requests.
runConcurrentlyBunched ::
forall m opts a.
( MonadUnliftIO m,
HasField "batchSize" opts Natural
) =>
opts ->
-- | list of actions to run
[m a] ->
ConduitT () [a] m ()
runConcurrentlyBunched opts acts = do
let batchSize = assertField (boundedNatural @Int) opts.batchSize
let go :: [m a] -> ConduitT () [a] m ()
go [] = pure ()
go acts' = do
let (batch, rest) = splitAt batchSize acts'
res <- lift $ mapConcurrentlyTraced id batch
Cond.yield res
go rest
go acts
-- | Run the field parser and throw an uncatchable assertion error if it fails.
assertField :: (HasCallStack) => FieldParser from to -> from -> to
assertField parser from = Field.runFieldParser parser from & unwrapError
boundedNatural :: forall i. (Integral i, Bounded i) => FieldParser Natural i
boundedNatural = lmap naturalToInteger (Field.bounded @i "boundedNatural")
redactedGetTorrentFileAndInsert ::
( HasField "torrentId" r Int,
HasField "useFreeleechTokens" r Bool,

View file

@ -53,7 +53,6 @@ import Network.Wai qualified as Wai
import Network.Wai.Handler.Warp qualified as Warp
import Network.Wai.Parse (parseContentType)
import OpenTelemetry.Attributes qualified as Otel
import OpenTelemetry.Context.ThreadLocal qualified as Otel
import OpenTelemetry.Trace qualified as Otel hiding (getTracer, inSpan, inSpan')
import OpenTelemetry.Trace.Monad qualified as Otel
import Parse (Parse, showContext)
@ -148,9 +147,8 @@ htmlUi = do
\dat _span ->
( pure $ htmlPageChrome ourHtmlIntegrities [fmt|whatcd-resolver Search {dat.queryArgs.searchstr & bytesToTextUtf8Lenient}|],
do
t <- redactedSearchAndInsert [("searchstr", dat.queryArgs.searchstr)]
runTransaction $ do
res <- t
res <- redactedSearchAndInsert [("searchstr", dat.queryArgs.searchstr)]
(table, settings) <-
concurrentlyTraced
( do
@ -336,9 +334,7 @@ htmlUi = do
>>> (Field.bounded @Int "Int")
)
)
t <- redactedRefreshArtist dat
runTransaction $ do
t
runTransaction $ redactedRefreshArtist dat
pure $ E22 (label @"redirectTo" $ textToBytesUtf8 $ mkArtistLink dat)
),
( "autorefresh",
@ -459,40 +455,6 @@ mainHtml' dat = do
/> -->
|]
withAsyncTraced :: (MonadUnliftIO m) => m a -> (Async a -> m b) -> m b
withAsyncTraced act f = do
ctx <- Otel.getContext
withAsync
( do
_old <- Otel.attachContext ctx
act
)
f
-- | Run two actions concurrently, and add them to the current Otel trace
concurrentlyTraced :: (MonadUnliftIO m) => m a -> m b -> m (a, b)
concurrentlyTraced act1 act2 = do
ctx <- Otel.getContext
concurrently
( do
_old <- Otel.attachContext ctx
act1
)
( do
_old <- Otel.attachContext ctx
act2
)
mapConcurrentlyTraced :: (MonadUnliftIO m, Traversable t) => (a -> m b) -> t a -> m (t b)
mapConcurrentlyTraced f t = do
ctx <- Otel.getContext
mapConcurrently
( \a -> do
_old <- Otel.attachContext ctx
f a
)
t
parseMultipartOrThrow :: (MonadLogger m, MonadIO m, MonadThrow m) => Otel.Span -> Wai.Request -> Multipart.MultipartParseT m a -> m a
parseMultipartOrThrow span req parser =
Multipart.parseMultipartOrThrow