cleanup, return ws and channel instead of passing handlers around

This commit is contained in:
@s.roertgen 2024-12-19 18:12:54 +01:00
parent 4052651614
commit c43efc5fcf

View file

@ -1,41 +1,83 @@
(ns nostr.core
(:require [hato.websocket :as ws]
[cheshire.core :as json])
[cheshire.core :as json]
[clojure.core.async :refer [chan put! go-loop <!]])
(:import [java.nio CharBuffer]))
(def subscriptions (atom {}))
(def ws-uri-map (atom {}))
(defn add-to-subs [uri subscription]
(swap! subscriptions
(fn [current-map]
(update current-map uri (fn [existing-set]
(conj (or existing-set #{}) subscription))))))
(defn remove-sub [uri subscription]
(swap! subscriptions
(fn [current-map]
(update current-map uri (fn [existing-set]
(filter #(not= subscription %) existing-set))))))
(defn send! [ws msg]
(ws/send! ws (json/generate-string msg)))
(defn subscribe [ws event-filter]
(let [sub-id (str (random-uuid))
msg ["REQ" sub-id event-filter]]
(add-to-subs (get @ws-uri-map ws) msg)
(send! ws msg)))
(defn resubscribe [ws subs]
(for [sub subs]
(subscribe ws sub)))
(defn unsubscribe [ws event-filter]
(let [ws-uri (get @ws-uri-map ws)
sub (first (filter #(= event-filter (nth % 2)) (get @subscriptions ws-uri)))]
(send! ws ["CLOSE" (second sub)])
(remove-sub ws-uri sub)))
(defn close! [ws]
(ws/close! ws))
(defn connect
"Establishes a websocket connection and handles events using user-defined handlers.
Args:
- `uri`: The websocket URI to connect to.
- `handlers`: A map containing the following keys:
- `:on-open-handler`: (fn [ws] ...) called when the connection opens.
- `:on-message-handler`: (fn [ws parsed-msg] ...) called for each message.
- `:on-close-handler`: (fn [ws status reason] ...) called when the connection closes.
Returns:
- The websocket instance with your handlers."
[uri {:keys [on-message-handler on-open-handler on-close-handler]}]
@(ws/websocket uri
{:on-message (fn [ws msg last?]
(let [msg-str (if (instance? CharBuffer msg)
(str msg)
msg)
parsed (json/parse-string msg-str true)]
(on-message-handler ws parsed)))
:on-open (fn [ws]
(on-open-handler ws))
:on-close (fn [ws status reason]
(on-close-handler ws status reason))}))
(defn subscribe [ws msg]
(ws/send! ws (json/generate-string msg)))
(defn send! [ws msg]
(ws/send! ws (json/generate-string msg)))
(defn close! [ws]
(ws/close! ws))
- The websocket instance and a channel"
[uri]
(let [c (chan)
reconnect (fn reconnect [status]
(try
(Thread/sleep 3000)
(connect uri)
(resubscribe uri (get @subscriptions uri))
(catch Exception e
(println "Reconnect failed for " uri ":" (.getMessage e))
(reconnect status))))
ws @(ws/websocket uri
{:on-message (fn [ws msg last?]
(when last?
(let [msg-str
(if (instance? CharBuffer msg)
(str msg)
msg)
parsed (json/parse-string msg-str true)]
(when (contains? #{"EOSE" "EVENT"} (first parsed))
(put! c parsed)))))
:on-open (fn [ws]
(println "connection open: " ws))
:on-close (fn [ws status reason]
(println "connection closed: " ws status reason)
(when (= 1006 status)
(reconnect status)))})]
(swap! ws-uri-map assoc ws uri)
{:ws ws
:channel c}))
(defn fetch-events
"Args:
@ -50,28 +92,17 @@
"
[uri filter]
(let [resources (atom [])
result-promise (promise)]
(connect uri
{:on-message-handler (fn [ws msg]
(case (first msg)
"EVENT" (let [event (nth msg 2 nil)]
(swap! resources conj event)) ;; Add to resources
"EOSE" (do
(deliver result-promise @resources) ;; Deliver resources to the promise
(close! ws)))) ;; TODO should I really close here?
:on-open-handler (fn [ws]
(send! ws ["REQ" (random-uuid) filter]))})
result-promise (promise)
{:keys [ws channel]} (connect uri)]
(subscribe ws filter)
(go-loop []
(when-some [message (<! channel)]
(case (first message)
"EVENT" (let [event (nth message 2 nil)]
(swap! resources conj event))
"EOSE" (do
(close! ws)
(deliver result-promise @resources)))
(recur)))
@result-promise))
(comment
(let [ws (connect "ws://localhost:7778"
{:on-message-handler (fn [ws msg]
(println "Received:" msg)
msg)
:on-open-handler (fn [ws]
(println "WS:" ws))})]
(subscribe ws {:kinds [30142]
:limit 2})
(subscribe ws {:kinds [1]
:limit 2})))