Skip to content

Instantly share code, notes, and snippets.

@tlk3
Last active January 13, 2024 13:40
Show Gist options
  • Select an option

  • Save tlk3/76f6c5516eceea5fc668664fa6073425 to your computer and use it in GitHub Desktop.

Select an option

Save tlk3/76f6c5516eceea5fc668664fa6073425 to your computer and use it in GitHub Desktop.
redpanda to deephaven
from deephaven.stream.kafka import consumer as kc
from deephaven import dtypes as dht
from deephaven import time as dhtu
from deephaven import agg
from deephaven.plot.figure import Figure
kafka_conf = {
"bootstrap.servers": "127.0.0.1:9093",
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanism": "SCRAM-SHA-256",
}
crypto = kc.consume(
kafka_conf,
"BINANCE.aggTrade",
table_type=kc.TableType.ring(500_000),
key_spec=kc.simple_spec("Symbol", dht.string),
value_spec=kc.json_spec(
[
("Timestamp", dht.Instant),
("Price", dht.double),
("Quantity", dht.double),
],
mapping={
"T": "Timestamp",
"p": "Price",
"q": "Quantity",
}
),
)
crypto_bin = crypto.update(
[
"Volume_USD = Price * Quantity",
"Fifteen = lowerBin(Timestamp, 15 * MINUTE)",
"Thirty = lowerBin(Timestamp, 30 * MINUTE)",
"Hourly = lowerBin(Timestamp, 1 * HOUR)",
"Daily = lowerBin(Timestamp, 1 * DAY)",
"Weekly = lowerBin(Timestamp, 1 * WEEK, 4 * DAY)",
]
)
agg_list = [
agg.first(cols=["Open = Price"]),
agg.max_(cols=["High = Price"]),
agg.min_(cols=["Low = Price"]),
agg.last(cols=["Close = Price"]),
agg.sum_(cols=["Volume = Quantity"]),
agg.sum_(cols=["Volume_USD = Volume_USD"]),
]
crypto_bin_sorted = crypto_bin.sort(order_by=["Fifteen"])
ohlcv_15 = crypto_bin_sorted.agg_by(agg_list, by=["Symbol", "Fifteen"]).last_by(by=["Symbol"])
ohlcv_30 = crypto_bin_sorted.agg_by(agg_list, by=["Symbol", "Thirty"]).last_by(by=["Symbol"])
ohlcv_60 = crypto_bin_sorted.agg_by(agg_list, by=["Symbol", "Hourly"]).last_by(by=["Symbol"])
ohlcv_d = crypto_bin_sorted.agg_by(agg_list, by=["Symbol", "Daily"]).last_by(by=["Symbol"])
ohlcv_w = crypto_bin_sorted.agg_by(agg_list, by=["Symbol", "Weekly"]).last_by(by=["Symbol"])
crypto_sum_by = crypto.view(formulas=["Symbol", "Trade_Count = 1", "Quantity", "Volume_USD = Price * Quantity"])\
.sum_by(by=["Symbol"])\
.sort_descending(order_by=["USD_Value"])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment