Browse Source

using @threadcall

herry 4 years ago
parent
commit
773975f16c
2 changed files with 15 additions and 1 deletions
  1. 1 0
      src/client.jl
  2. 14 1
      src/wrapper.jl

+ 1 - 0
src/client.jl

@@ -34,6 +34,7 @@ end
 
 
 function KafkaClient(typ::Integer, conf::Dict=Dict(); dr_cb=nothing, err_cb=nothing)
+    println("KafkaClient with Herry's Patch")
     c_conf = kafka_conf_new()
     for (k, v) in conf
         kafka_conf_set(c_conf, string(k), string(v))

+ 14 - 1
src/wrapper.jl

@@ -111,10 +111,17 @@ end
 
 
 function kafka_poll(rk::Ptr{Cvoid}, timeout::Integer)
+    
+    #println("DOING KAFKA_POLL")
+    #=
     return ccall((:rd_kafka_poll, librdkafka), Cint,
                  (Ptr{Cvoid}, Cint),
                  rk, timeout)
-    
+                 =#
+    return @threadcall((:rd_kafka_poll, librdkafka), Cint,
+                (Ptr{Cvoid}, Cint),
+                rk, timeout)
+
 end
 
 
@@ -247,8 +254,14 @@ end
 
 
 function kafka_consumer_poll(rk::Ptr{Cvoid}, timeout::Integer)
+    #println("KAFKA_CONSUMER_POLL @threadcall")
+    #=
     msg_ptr = ccall((:rd_kafka_consumer_poll, librdkafka), Ptr{CKafkaMessage},
                     (Ptr{Cvoid}, Cint), rk, timeout)
+    =#
+    msg_ptr = @threadcall((:rd_kafka_consumer_poll, librdkafka), Ptr{CKafkaMessage},
+                    (Ptr{Cvoid}, Cint), rk, timeout)
+
     if msg_ptr != Ptr{CKafkaMessage}(0)
         return msg_ptr
     else