Last active
January 13, 2024 13:40
-
-
Save tlk3/76f6c5516eceea5fc668664fa6073425 to your computer and use it in GitHub Desktop.
redpanda to deephaven
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from deephaven.stream.kafka import consumer as kc | |
| from deephaven import dtypes as dht | |
| from deephaven import time as dhtu | |
| from deephaven import agg | |
| from deephaven.plot.figure import Figure | |
| kafka_conf = { | |
| "bootstrap.servers": "127.0.0.1:9093", | |
| "security.protocol": "SASL_PLAINTEXT", | |
| "sasl.mechanism": "SCRAM-SHA-256", | |
| } | |
| crypto = kc.consume( | |
| kafka_conf, | |
| "BINANCE.aggTrade", | |
| table_type=kc.TableType.ring(500_000), | |
| key_spec=kc.simple_spec("Symbol", dht.string), | |
| value_spec=kc.json_spec( | |
| [ | |
| ("Timestamp", dht.Instant), | |
| ("Price", dht.double), | |
| ("Quantity", dht.double), | |
| ], | |
| mapping={ | |
| "T": "Timestamp", | |
| "p": "Price", | |
| "q": "Quantity", | |
| } | |
| ), | |
| ) | |
| crypto_bin = crypto.update( | |
| [ | |
| "Volume_USD = Price * Quantity", | |
| "Fifteen = lowerBin(Timestamp, 15 * MINUTE)", | |
| "Thirty = lowerBin(Timestamp, 30 * MINUTE)", | |
| "Hourly = lowerBin(Timestamp, 1 * HOUR)", | |
| "Daily = lowerBin(Timestamp, 1 * DAY)", | |
| "Weekly = lowerBin(Timestamp, 1 * WEEK, 4 * DAY)", | |
| ] | |
| ) | |
| agg_list = [ | |
| agg.first(cols=["Open = Price"]), | |
| agg.max_(cols=["High = Price"]), | |
| agg.min_(cols=["Low = Price"]), | |
| agg.last(cols=["Close = Price"]), | |
| agg.sum_(cols=["Volume = Quantity"]), | |
| agg.sum_(cols=["Volume_USD = Volume_USD"]), | |
| ] | |
| crypto_bin_sorted = crypto_bin.sort(order_by=["Fifteen"]) | |
| ohlcv_15 = crypto_bin_sorted.agg_by(agg_list, by=["Symbol", "Fifteen"]).last_by(by=["Symbol"]) | |
| ohlcv_30 = crypto_bin_sorted.agg_by(agg_list, by=["Symbol", "Thirty"]).last_by(by=["Symbol"]) | |
| ohlcv_60 = crypto_bin_sorted.agg_by(agg_list, by=["Symbol", "Hourly"]).last_by(by=["Symbol"]) | |
| ohlcv_d = crypto_bin_sorted.agg_by(agg_list, by=["Symbol", "Daily"]).last_by(by=["Symbol"]) | |
| ohlcv_w = crypto_bin_sorted.agg_by(agg_list, by=["Symbol", "Weekly"]).last_by(by=["Symbol"]) | |
| crypto_sum_by = crypto.view(formulas=["Symbol", "Trade_Count = 1", "Quantity", "Volume_USD = Price * Quantity"])\ | |
| .sum_by(by=["Symbol"])\ | |
| .sort_descending(order_by=["USD_Value"]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment