wrapper.jl 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. ## wrapper for librdkafka C API, see:
  2. ## https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h
  3. function rd_kafka_version()
  4. ccall((:rd_kafka_version, RDKafka.librdkafka), Cint, ())
  5. end
  6. ## rd_kafka_conf_t
  7. function kafka_conf_new()
  8. return ccall((:rd_kafka_conf_new, librdkafka), Ptr{Cvoid}, ())
  9. end
  10. function kafka_conf_destroy(conf::Ptr{Cvoid})
  11. ccall((:rd_kafka_conf_destroy, librdkafka), Cvoid, (Ptr{Cvoid},), conf)
  12. end
  13. function kafka_conf_set(conf::Ptr{Cvoid}, key::String, val::String)
  14. err_str = zeros(UInt8, 512)
  15. return ccall((:rd_kafka_conf_set, librdkafka), Cvoid,
  16. (Ptr{Cvoid}, Cstring, Cstring, Ptr{UInt8}, Csize_t),
  17. conf, key, val, pointer(err_str), sizeof(err_str))
  18. end
  19. function kafka_conf_get(conf::Ptr{Cvoid}, key::String)
  20. dest = zeros(UInt8, 512)
  21. sz_ref = Ref{Csize_t}(0)
  22. ccall((:rd_kafka_conf_get, librdkafka), Cvoid,
  23. (Ptr{Cvoid}, Cstring, Ptr{UInt8}, Ptr{Csize_t}),
  24. conf, key, pointer(dest), sz_ref)
  25. return unsafe_string(pointer(dest))
  26. end
  27. function kafka_conf_set_error_cb(conf::Ptr{Cvoid}, c_fn::Ptr{Cvoid})
  28. ccall((:rd_kafka_conf_set_error_cb, librdkafka), Cvoid,
  29. (Ptr{Cvoid}, Ptr{Cvoid}), conf, c_fn)
  30. end
  31. function kafka_conf_set_dr_msg_cb(conf::Ptr{Cvoid}, c_fn::Ptr{Cvoid})
  32. ccall((:rd_kafka_conf_set_dr_msg_cb, librdkafka), Cvoid,
  33. (Ptr{Cvoid}, Ptr{Cvoid}), conf, c_fn)
  34. end
  35. ## rd_kafka_t
  36. const KAFKA_TYPE_PRODUCER = Cint(0)
  37. const KAFKA_TYPE_CONSUMER = Cint(1)
  38. function kafka_new(conf::Ptr{Cvoid}, kafka_type::Cint)
  39. err_str = zeros(UInt8, 512)
  40. client = ccall((:rd_kafka_new, librdkafka),
  41. Ptr{Cvoid},
  42. (Cint, Ptr{Cvoid}, Ptr{UInt8}, Csize_t),
  43. kafka_type, conf, pointer(err_str), sizeof(err_str))
  44. return client
  45. end
  46. function kafka_destroy(rk::Ptr{Cvoid})
  47. ccall((:rd_kafka_destroy, librdkafka), Cvoid, (Ptr{Cvoid},), rk)
  48. end
  49. ## rd_kafka_topic_conf_t
  50. function kafka_topic_conf_new()
  51. return ccall((:rd_kafka_topic_conf_new, librdkafka), Ptr{Cvoid}, ())
  52. end
  53. function kafka_topic_conf_destroy(conf::Ptr{Cvoid})
  54. ccall((:rd_kafka_topic_conf_destroy, librdkafka), Cvoid, (Ptr{Cvoid},), conf)
  55. end
  56. function kafka_topic_conf_set(conf::Ptr{Cvoid}, key::String, val::String)
  57. err_str = Array{UInt8}(undef, 512)
  58. return ccall((:rd_kafka_topic_conf_set, librdkafka), Cvoid,
  59. (Ptr{Cvoid}, Cstring, Cstring, Ptr{UInt8}, Csize_t),
  60. conf, key, val, pointer(err_str), sizeof(err_str))
  61. end
  62. function kafka_topic_conf_get(conf::Ptr{Cvoid}, key::String)
  63. dest = Array{UInt8}(undef, 512)
  64. sz_ref = Ref{Csize_t}(0)
  65. ccall((:rd_kafka_topic_conf_get, librdkafka), Cvoid,
  66. (Ptr{Cvoid}, Cstring, Ptr{UInt8}, Ptr{Csize_t}),
  67. conf, key, pointer(dest), sz_ref)
  68. return unsafe_string(pointer(dest))
  69. end
  70. # rd_kafka_topic_t
  71. function kafka_topic_new(rk::Ptr{Cvoid}, topic::String, topic_conf::Ptr{Cvoid})
  72. return ccall((:rd_kafka_topic_new, librdkafka), Ptr{Cvoid},
  73. (Ptr{Cvoid}, Cstring, Ptr{Cvoid}),
  74. rk, topic, topic_conf)
  75. end
  76. function kafka_topic_destroy(rkt::Ptr{Cvoid})
  77. ccall((:rd_kafka_topic_destroy, librdkafka), Cvoid, (Ptr{Cvoid},), rkt)
  78. end
  79. function kafka_poll(rk::Ptr{Cvoid}, timeout::Integer)
  80. #println("DOING KAFKA_POLL")
  81. #=
  82. return ccall((:rd_kafka_poll, librdkafka), Cint,
  83. (Ptr{Cvoid}, Cint),
  84. rk, timeout)
  85. =#
  86. return @threadcall((:rd_kafka_poll, librdkafka), Cint,
  87. (Ptr{Cvoid}, Cint),
  88. rk, timeout)
  89. end
  90. function produce(rkt::Ptr{Cvoid}, partition::Integer,
  91. key::Vector{UInt8}, payload::Vector{UInt8})
  92. flags = Cint(0)
  93. errcode = ccall((:rd_kafka_produce, librdkafka), Cint,
  94. (Ptr{Cvoid}, Int32, Cint,
  95. Ptr{Cvoid}, Csize_t,
  96. Ptr{Cvoid}, Csize_t,
  97. Ptr{Cvoid}),
  98. rkt, Int32(partition), flags,
  99. pointer(payload), length(payload),
  100. pointer(key), length(key),
  101. C_NULL)
  102. if errcode != 0
  103. #error("Produce request failed with error code (unix): $errcode")
  104. ## errno is c global var
  105. errnr = unsafe_load(cglobal(:errno, Int32))
  106. errmsg = unsafe_string(ccall(:strerror, Cstring, (Int32,), errnr))
  107. error("Produce request failed with error code: $errmsg")
  108. end
  109. end
  110. ## topic partition list
  111. mutable struct CKafkaTopicPartition
  112. topic::Cstring
  113. partition::Cint
  114. offset::Clonglong
  115. metadata::Ptr{Cvoid}
  116. metadata_size::Csize_t
  117. opaque::Ptr{Cvoid}
  118. err::Cint
  119. _private::Ptr{Cvoid}
  120. end
  121. mutable struct CKafkaTopicPartitionList
  122. cnt::Cint
  123. size::Cint
  124. elems::Ptr{CKafkaTopicPartition}
  125. end
  126. function kafka_topic_partition_list_new(sz::Integer=0)
  127. rkparlist = ccall((:rd_kafka_topic_partition_list_new, librdkafka), Ptr{CKafkaTopicPartitionList},
  128. (Cint,), sz)
  129. if rkparlist != Ptr{CKafkaTopicPartitionList}(0)
  130. return rkparlist
  131. end
  132. return nothing
  133. end
  134. function kafka_topic_partition_list_destroy(rkparlist::Ptr{CKafkaTopicPartitionList})
  135. ccall((:rd_kafka_topic_partition_list_destroy, librdkafka), Cvoid, (Ptr{CKafkaTopicPartitionList},), rkparlist)
  136. end
  137. function kafka_topic_partition_list_add(rkparlist::Ptr{CKafkaTopicPartitionList},
  138. topic::String, partition::Integer)
  139. ccall((:rd_kafka_topic_partition_list_add, librdkafka), Ptr{CKafkaTopicPartition},
  140. (Ptr{CKafkaTopicPartitionList}, Cstring, Int32,), rkparlist, topic, partition)
  141. end
  142. ## partition assignment
  143. function kafka_assignment(rk::Ptr{Cvoid}, rkparlist::Ptr{CKafkaTopicPartitionList})
  144. errcode = ccall((:rd_kafka_assignment, librdkafka), Cint,
  145. (Ptr{Cvoid}, Ref{Ptr{CKafkaTopicPartitionList}}), rk, rkparlist)
  146. if errcode != 0
  147. error("Assignment retrieval failed with error $errcode")
  148. end
  149. end
  150. ## subscribe
  151. function kafka_subscribe(rk::Ptr{Cvoid}, rkparlist::Ptr{CKafkaTopicPartitionList})
  152. errcode = ccall((:rd_kafka_subscribe, librdkafka), Cint,
  153. (Ptr{Cvoid}, Ptr{Cvoid}), rk, rkparlist)
  154. if errcode != 0
  155. error("Subscription failed with error $errcode")
  156. end
  157. end
  158. ## seek
  159. function kafka_offsets_for_times(rk::Ptr{Cvoid}, rkparlist::Ptr{CKafkaTopicPartitionList}, timeout::Integer)
  160. errcode = ccall((:rd_kafka_offsets_for_times, librdkafka), Cint,
  161. (Ptr{Cvoid}, Ptr{CKafkaTopicPartitionList}, Cint), rk, rkparlist, timeout)
  162. if errcode != 0
  163. error("kafka_offsets_for_times failed with error $errcode")
  164. end
  165. end
  166. function kafka_seek(rkt::Ptr{Cvoid}, partition::Int32, offset::Int64, timeout::Int=1000)
  167. errcode = ccall((:rd_kafka_seek, librdkafka), Cint,
  168. (Ptr{Cvoid}, Cint, Clonglong, Cint),
  169. rkt, partition, offset, timeout)
  170. if errcode != 0
  171. error("kafka_seek failed with error $errcode")
  172. end
  173. end
  174. function kafka_assign(rk::Ptr{Cvoid}, rkparlist::Ptr{CKafkaTopicPartitionList})
  175. errcode = ccall((:rd_kafka_assign, librdkafka), Cint,
  176. (Ptr{Cvoid}, Ptr{CKafkaTopicPartitionList}),
  177. rk, rkparlist)
  178. if errcode != 0
  179. error("kafka_assign failed with error $errcode")
  180. end
  181. end
  182. struct CKafkaMessage
  183. err::Cint
  184. rkt::Ptr{Cvoid}
  185. partition::Int32
  186. payload::Ptr{UInt8}
  187. len::Csize_t
  188. key::Ptr{UInt8}
  189. key_len::Csize_t
  190. offset::Int64
  191. _private::Ptr{Cvoid}
  192. end
  193. function kafka_consumer_poll(rk::Ptr{Cvoid}, timeout::Integer)
  194. #println("KAFKA_CONSUMER_POLL @threadcall")
  195. #=
  196. msg_ptr = ccall((:rd_kafka_consumer_poll, librdkafka), Ptr{CKafkaMessage},
  197. (Ptr{Cvoid}, Cint), rk, timeout)
  198. =#
  199. msg_ptr = @threadcall((:rd_kafka_consumer_poll, librdkafka), Ptr{CKafkaMessage},
  200. (Ptr{Cvoid}, Cint), rk, timeout)
  201. if msg_ptr != Ptr{CKafkaMessage}(0)
  202. return msg_ptr
  203. else
  204. return nothing
  205. end
  206. end
  207. function kafka_message_destroy(msg_ptr::Ptr{CKafkaMessage})
  208. ccall((:rd_kafka_message_destroy, librdkafka), Cvoid, (Ptr{Cvoid},), msg_ptr)
  209. end
  210. 5