Skip to content

Commit

Permalink
consumer kafka compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
LoW0lf committed Apr 16, 2015
1 parent 38e7b15 commit 1c2795f
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 14 deletions.
Binary file added data/fetchResponse_hmb
Binary file not shown.
37 changes: 23 additions & 14 deletions src/HMB/Internal/Handler.hs
Original file line number Diff line number Diff line change
Expand Up @@ -89,26 +89,35 @@ sendProduceResponse socket responsemessage = do
requestLog :: Request -> IO [Log]
requestLog req = mapM readLog [
(BS.unpack(topicName x), fromIntegral(rqFtPartitionNumber y))
| x <- rqFtTopics req, y <- partitions x
| x <- rqFtTopics req, y <- partitions x
]

packMsToFtRsPayload :: MessageSet -> RsFtPayload
packMsToFtRsPayload ms = RsFtPayload 0 0 0 (fromIntegral $ len ms) ms

packLogToFtRsPayload :: Log -> [RsFtPayload]
packLogToFtRsPayload log = map packMsToFtRsPayload log

packLogToFtRs :: Log -> Response
packLogToFtRs log = FetchResponse
(fromIntegral $ BS.length $ BS.pack("myTopic"))
(BS.pack("myTopic"))
(fromIntegral $ length log)
(packLogToFtRsPayload log)
packPartitionsToFtRsPayload :: TopicName -> Partition -> IO RsFtPayload
packPartitionsToFtRsPayload t p = do
log <- readLog (BS.unpack $ t, fromIntegral $ rqFtPartitionNumber p)
return $ RsFtPayload
0
0
0
(fromIntegral $ BL.length $ foldl (\acc ms -> BL.append acc (buildMessageSet ms)) BL.empty log)
log

--packLogToFtRsPayload :: Log -> [RsFtPayload]
--packLogToFtRsPayload log = packMsToFtRsPayload log

packLogToFtRs :: Topic -> IO Response
packLogToFtRs t = do
rss <- (mapM (packPartitionsToFtRsPayload $ topicName t) $ partitions t)
return $ FetchResponse
(topicNameLen t)
(topicName t)
(numPartitions t )
rss

handleFetchRequest :: Request -> Socket -> IO()
handleFetchRequest req sock = do
logs <- requestLog req
let rs = map packLogToFtRs logs
rs <- mapM packLogToFtRs (rqFtTopics req)
let rsms = ResponseMessage 0 1 rs
print rsms
let msg = buildFtRsMessage rsms
Expand Down
2 changes: 2 additions & 0 deletions src/HMB/Internal/Log/Writer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,5 @@ parseLog a = do
readLog :: (String, Int) -> IO Log
readLog (t, p) = parseLog $
getPath (logFolder t p) (logFile 0)


0 comments on commit 1c2795f

Please sign in to comment.