-
-
Save zerg000000/2006009abefeac47a5ac82f38a55a8cb to your computer and use it in GitHub Desktop.
| {:deps {net.cgrand/xforms {:mvn/version "0.19.0"} | |
| org.clojure/clojure {:mvn/version "1.10.1"} | |
| org.clojure/core.async {:mvn/version "0.4.500"}}} |
| ;; dataset | |
| (def data [{:ts #inst "2019-01-01T12:00:00.125Z" :price 10.0} | |
| {:ts #inst "2019-01-01T12:00:00.250Z" :price 12.0} | |
| {:ts #inst "2019-01-01T12:00:00.375Z" :price 10.0} | |
| {:ts #inst "2019-01-01T12:00:00.500Z" :price 10.0} | |
| {:ts #inst "2019-01-01T12:00:00.625Z" :price 10.0} | |
| {:ts #inst "2019-01-01T12:00:00.750Z" :price 10.0} | |
| {:ts #inst "2019-01-01T12:00:00.875Z" :price 10.0} | |
| {:ts #inst "2019-01-01T12:00:01.000Z" :price 12.0} | |
| {:ts #inst "2019-01-01T12:00:01.125Z" :price 10.0} | |
| {:ts #inst "2019-01-01T12:00:01.250Z" :price 8.0} | |
| {:ts #inst "2019-01-01T12:00:01.375Z" :price 10.0} | |
| {:ts #inst "2019-01-01T12:00:01.500Z" :price 1.0} | |
| {:ts #inst "2019-01-01T12:00:01.625Z" :price 10.0} | |
| {:ts #inst "2019-01-01T12:00:01.750Z" :price 10.0} | |
| {:ts #inst "2019-01-01T12:00:01.875Z" :price 7.0} | |
| {:ts #inst "2019-01-01T12:00:02.000Z" :price 11.0}]) | |
| ;; require transducer library | |
| (require '[net.cgrand.xforms :as x]) | |
| (require '[net.cgrand.xforms.rfs :as rf]) | |
| ;; since I want to window by second | |
| ;; timestamp to sec | |
| (defn ts->sec [x] | |
| (int (/ (-> x :ts (.getTime)) | |
| 1000.0))) | |
| (def first-xf (x/reduce rf/some)) | |
| (def price-xf | |
| (comp (map :price) | |
| (x/transjuxt {:min x/min | |
| :max x/max | |
| :open first-xf | |
| :close x/last}))) | |
| (defn ticks->ohlc [ticks] | |
| (x/into [] price-xf ticks)) | |
| ; Actually, we can use non-transducer version for this fn | |
| #_(defn ticks->ohlc [ticks] | |
| (let [prices (mapv :price ticks)] | |
| [{:min (min prices) | |
| :max (max prices) | |
| :open (first prices) | |
| :close (last prices)}])) | |
| (def xf | |
| (comp (partition-by ts->sec) | |
| (mapcat ticks->ohlc))) | |
| (into [] xf data) | |
| ;; it could be reused in stream based processing | |
| (require '[clojure.core.async :as ac]) | |
| ;; attach transducer to a stream | |
| (def ch (ac/chan 1000 xf)) | |
| ;; create a async loop to prn output of stream | |
| (ac/go-loop [] | |
| (when-let [v (ac/<! ch)] | |
| (prn v) | |
| (recur))) | |
| ;; put values into the stream as you want | |
| (doseq [x data] | |
| (ac/>!! ch x)) | |
| ;; notice the last one is missing | |
| ;; you have to close the stream to see the last result | |
| (ac/close! ch) |
curious, line 35: does that mean ... (map first) won't work?
curious, line 35: does that mean
... (map first)won't work?
(map first) have a different meaning
(into [] (map first) [ [1] [2] [3] [4] [5] ])
[1 2 3 4 5]using
(x/reduce rf/some) would be more clear?
Interestingly, the following fn gives the same result
(defn ticks->ohlc [ticks]
(into []
(comp (map :price)
(x/transjuxt {:min x/min
:max x/max
:open identity
:close x/last}))
ticks))
Say the following is true
(=
(into [] (x/transjuxt [identity x/max]) [3 4 5])
(into [] (x/transjuxt [(map identity) x/max]) [3 4 5]))
Interestingly, the following fn gives the same result
(defn ticks->ohlc [ticks] (into [] (comp (map :price) (x/transjuxt {:min x/min :max x/max :open identity :close x/last})) ticks))Say the following is
true(= (into [] (x/transjuxt [identity x/max]) [3 4 5]) (into [] (x/transjuxt [(map identity) x/max]) [3 4 5]))
After some thinking, identity actually match the definition of transducer.
The definition of transducer -- a function that take reducing-fn return a function as reducing-fn
(fn identity [reducing-fn]
([] (reducing-fn))
([acc] (reducing-fn acc))
([acc item] (reducing-fn acc item)))
Result is the same if (def first-xf rf/some) is used instead of (def first-xf (x/reduce rf/some)), though rf/some is not a transducer. Not sure if there's any performance difference if input Coll is large.
the answer could be yes or not, transducer is (reducing-fn) -> reducing-fn, something like middleware in ring. rf/some is a reducing-fn, something like handler in ring.
line 43 should be:
(into [] xf data).