This commit is contained in:
@s.roertgen 2024-12-31 11:23:29 +01:00
parent 2dc7f5c39b
commit 26bafba851
6 changed files with 109 additions and 103 deletions

View file

@ -1,3 +1,3 @@
{:websocket {:url "ws://localhost:7778"}
:typesense {:url "http://localhost:8108"
:apikey "xyz"}}
:api-key "xyz"}}

View file

@ -4,10 +4,6 @@
[org.httpkit.client :as http]
[nostr.edufeed :as nostr]))
;; Typesense Client Configuration
(def typesense-url "http://localhost:8108")
(def typesense-api-key "xyz")
(def amb-schema {:name "amb"
:enable_nested_fields true
:fields [{:name "name" :type "string"}
@ -20,47 +16,50 @@
{:name "learningResourceType.id" :type "string[]" :optional true}
{:name "learningResourceType.prefLabel" :type "object[]" :optional true}]})
(defn insert-collection []
(http/post (str typesense-url "/collections")
{:headers {"X-TYPESENSE-API-KEY" typesense-api-key
(defn insert-collection [url api-key]
(http/post (str url "/collections")
{:headers {"X-TYPESENSE-API-KEY" api-key
"Content-Type" "application/json"}
:body (json/generate-string amb-schema)}))
(defn check-for-schema []
(http/get (str typesense-url "/collections")
{:headers {"X-TYPESENSE-API-KEY" typesense-api-key
(defn check-for-schema [url api-key]
(http/get (str url "/collections")
{:headers {"X-TYPESENSE-API-KEY" api-key
"Content-Type" "application/json"}}
(fn [{:keys [status headers body error opts]}]
(if error
(println "Failed, exception is " error)
(when-not (some #(= "amb" (:name %)) (json/parse-string body true))
(do (println "AMB not found...inserting collection")
(insert-collection)))))))
(insert-collection url api-key)))))))
;; Utility function to insert a document into Typesense
(defn insert-to-typesense [collection event]
(let [url (str typesense-url "/collections/" collection "/documents?dirty_values=drop&action=upsert")
(defn insert-to-typesense [collection event url api-key]
(let [url (str url "/collections/" collection "/documents?dirty_values=drop&action=upsert")
doc (nostr/convert-30142-to-nostr-amb event false)]
(http/post url
{:body (json/encode doc)
:headers {"X-TYPESENSE-API-KEY" typesense-api-key
:headers {"X-TYPESENSE-API-KEY" api-key
"Content-Type" "application/json"}}
(fn [{:keys [status headers body error opts]}]
(if error
(println "Failed, error is: ")
(println "Failed, error is: " error)
(println "Status " status "ID:" (:id doc)))))))
(defrecord Typesense []
(defrecord Typesense [config]
component/Lifecycle
(start [component]
(println ";; Starting Typesense Connection")
(assoc component :typesense typesense-url)
(check-for-schema))
(println ";; config " config)
(check-for-schema (:url config) (:api-key config))
(assoc component
:url (:url config)
:api-key (:api-key config)))
(stop [component]
(println ";; Stopping Typesense")
(assoc component :typesense nil)))
(defn new-typesense-component []
(map->Typesense {}))
(defn new-typesense-component [config]
(map->Typesense {:config config}))

View file

@ -1,75 +1,73 @@
(ns typesense-indexer.components.websocket
(:require [hato.websocket :as ws]
[com.stuartsierra.component :as component]
[cheshire.core :as json]
(:require [com.stuartsierra.component :as component]
[typesense-indexer.components.typesense :as typesense]
[nostr.core :as nostr])
(:import [java.nio CharBuffer]))
[clojure.core.async :as async :refer [chan go-loop alts! go >! <!! <! take!]]
[nostr.core :as nostr]))
(defn init-request-30142 [wc last-event newest-event reason]
(defn init-request-30142 [ws last-event newest-event reason]
(println "init request with reason" reason)
(case reason
"eose" (do
(nostr/send! wc ["CLOSE" "RAND"])
(nostr/send! wc ["REQ" "RANDNEW" {:kinds [30142]
:since (:created_at @newest-event)}]))
"reconnect" (nostr/send! wc ["REQ" "RAND" {:kinds [1]
:limit 4
:until (:created_at @last-event)}])
"init" (nostr/send! wc ["REQ" "RAND" {:kinds [30142]
:limit 6000}])))
(nostr/unsubscribe ws {:kinds [30142]
:limit 6})
(nostr/subscribe ws {:kinds [30142]
:since (:created_at @newest-event)}))
"reconnect" (nostr/subscribe ws {:kinds [1]
:limit 4
:until (:created_at @last-event)})
"init" (nostr/subscribe ws {:kinds [30142]
:limit 6})))
(defn on-message-handler [ws parsed last-parsed-event newest-event]
(let [event (nth parsed 2 nil)]
(println (first parsed) (:id event))
(when (and (= "EOSE" (first parsed))
(not= (:created_at @last-parsed-event) (:created_at @newest-event)))
(init-request-30142 ws last-parsed-event newest-event "eose"))
(when (> (:created_at event) (or (:created_at @newest-event) 0))
(reset! newest-event event))
(when (= "EVENT" (first parsed))
(do
(typesense/insert-to-typesense "amb" event)
(reset! last-parsed-event event)))))
(defn on-message-handler [ws parsed last-parsed-event newest-event typesense]
(try
(let [event (nth parsed 2 parsed)]
(when (and (= "EOSE" (first parsed))
(not= (:created_at @last-parsed-event) (:created_at @newest-event)))
(init-request-30142 ws last-parsed-event newest-event "eose"))
(when (> (get event :created_at 0) (get @newest-event :created_at 0))
(reset! newest-event event))
(when (= "EVENT" (first parsed))
(do
(typesense/insert-to-typesense "amb" event (:url typesense) (:api-key typesense))
(reset! last-parsed-event event))))
(catch Exception e
(println "error parsing event" parsed
"\n Error: "
e))))
(defn create-websocket [url on-close-handler last-parsed-event newest-event]
(nostr/connect url
{:on-open-handler (fn [ws] (println "Opened connection to url " url))
:on-message-handler (fn [ws msg] (on-message-handler ws msg last-parsed-event newest-event))
:on-close-handler (fn [ws status reason] (on-close-handler status))}))
(defrecord WebsocketConnection [url connection]
(defrecord WebsocketConnection [config
ws channel shutdown
typesense]
component/Lifecycle
(start [component]
(println ";; Starting WebsocketConnection")
(println ";; typesense" typesense)
(let [last-parsed-event (atom {:created_at nil})
newest-event (atom nil)
reconnect (fn reconnect [status]
(println "Lost connection, attempting reconnect for: " url)
(when (= 1006 status)
(try
(Thread/sleep 3000)
(when-not (:connection component)
(let [wc (create-websocket url reconnect last-parsed-event newest-event)]
(assoc component :connection wc)
(init-request-30142 wc last-parsed-event newest-event "reconnect")))
(catch Exception e
(println "Reconnect failed for " url ":" (.getMessage e))
(reconnect status)))))]
newest-event (atom nil)]
(try
(let [wc (create-websocket url reconnect last-parsed-event newest-event)]
(init-request-30142 wc last-parsed-event newest-event "init")
(assoc component :connection wc))
(let [{:keys [ws channel]} (nostr/connect-channel (:url config))]
(init-request-30142 ws last-parsed-event newest-event "init")
(go-loop []
(when-some [message (<! channel)]
(on-message-handler ws message last-parsed-event newest-event typesense)
(recur)))
(assoc component
:ws ws
:channel channel
:shutdown (promise)))
(catch Exception e
(println "Failed to start connection for " url)
(reconnect 1006)))))
(println "Failed to start connection for " (:url config) "Error: " e))))) ;; 1006 means "abnormal closure"
(stop [component]
(println ";; Stopping WebsocketConnection for url " url)
(when-let [wc (:connection component)]
(nostr/close! wc))
(assoc component :connection nil)))
(println ";; Stopping WebsocketConnection for url" (:url config))
(when-let [ws (:ws component)]
;; Signal any ongoing loops to stop
(when-let [shutdown (:shutdown component)]
(deliver shutdown true))
(nostr/close! ws)) ;; Close the WebSocket connection
(assoc component :ws nil :channel nil :shutdown nil)))
(defn new-websocket-connection [url]
(map->WebsocketConnection {:url url}))
(defn new-websocket-connection [config]
(component/using (map->WebsocketConnection {:config config }) [:typesense]))

View file

@ -0,0 +1,9 @@
(ns typesense-indexer.config
(:require [aero.core :as aero]
[clojure.java.io :as io]))
(defn read-config
[]
(-> "config.edn"
(io/resource)
(aero/read-config)))

View file

@ -1,14 +0,0 @@
(ns typesense-indexer.core
(:require
[typesense-indexer.system :as system]
[com.stuartsierra.component :as component]
))
(defn -main []
(component/start (system/new-system))
)
(comment
(component/stop (system/new-system))
)

View file

@ -1,21 +1,35 @@
(ns typesense-indexer.system
(:require [com.stuartsierra.component :as component]
[typesense-indexer.config :as config]
[typesense-indexer.components.websocket :as ws]
[typesense-indexer.components.typesense :as typesense]))
(defn new-system-map []
(defn new-system-map [config]
(component/system-map
:typesense (typesense/new-typesense-component)
:websocket (component/using (ws/new-websocket-connection "ws://localhost:7778")
[:typesense])))
:typesense (typesense/new-typesense-component (:typesense config))
:websocket (ws/new-websocket-connection (:websocket config))))
(defn configure [system]
(let [config (config/read-config)]
(merge-with merge system config)))
(defn new-dependency-map [] {})
(defn new-system
"Create the production system"
[]
(println ";; Setting up new system")
(-> (new-system-map)
(component/system-using (new-dependency-map))))
(defn -main []
(let [system (-> (config/read-config)
(new-system-map)
(component/start-system))
shutdown (-> system :websocket :shutdown)]
;; Add the shutdown hook before blocking
(.addShutdownHook
(Runtime/getRuntime)
(Thread.
#(do
(println ";; Shutdown hook triggered")
(component/stop-system system)
(println ";; System stopped"))))
;; Block on the shutdown promise
(println "System running. Press Ctrl+C to stop.")
@shutdown ;; Blocks until the shutdown promise is delivered
(println ";; Exiting application...")))