Seems to work pretty well

This commit is contained in:
@s.roertgen 2024-11-18 10:19:44 +01:00
commit 8409c811d7
6 changed files with 208 additions and 0 deletions

30
.gitignore vendored Normal file
View file

@ -0,0 +1,30 @@
.calva/output-window/
.calva/repl.calva-repl
.classpath
.clj-kondo/.cache
.cpcache
.eastwood
.factorypath
.hg/
.hgignore
.java-version
.lein-*
.lsp/.cache
.lsp/sqlite.db
.nrepl-history
.nrepl-port
.portal/vs-code.edn
.project
.rebel_readline_history
.settings
.socket-repl-port
.sw*
.vscode
*.class
*.jar
*.swp
*~
/checkouts
/classes
/target

5
deps.edn Normal file
View file

@ -0,0 +1,5 @@
{:deps {com.stuartsierra/component {:mvn/version "1.1.0"}
hato/hato {:mvn/version "1.0.0"}
http-kit/http-kit {:mvn/version "2.8.0"}
nostr-clj/nostr-clj {:local/root "/home/laoc/coding/test/nostr-clj"}
cheshire/cheshire {:mvn/version "5.10.0"}}}

View file

@ -0,0 +1,66 @@
(ns typesense-indexer.components.typesense
(:require [com.stuartsierra.component :as component]
[cheshire.core :as json]
[org.httpkit.client :as http]
[nostr-clj.edufeed :as nostr]))
;; Typesense Client Configuration
(def typesense-url "http://localhost:8108")
(def typesense-api-key "xyz")
(def amb-schema {:name "amb"
:enable_nested_fields true
:fields [{:name "name" :type "string"}
{:name "author" :type "string" :optional true}
{:name "creator" :type "object[]" :optional true}
{:name "description" :type "string" :optional true}
{:name "keywords" :type "string[]" :optional true}
{:name "about.id" :type "string[]" :optional true}
{:name "about.prefLabel" :type "object[]" :optional true}]})
(defn insert-collection []
(http/post (str typesense-url "/collections")
{:headers {"X-TYPESENSE-API-KEY" typesense-api-key
"Content-Type" "application/json"}
:body (json/generate-string amb-schema)}))
(defn check-for-schema []
(http/get (str typesense-url "/collections")
{:headers {"X-TYPESENSE-API-KEY" typesense-api-key
"Content-Type" "application/json"}}
(fn [{:keys [status headers body error opts]}]
(if error
(println "Failed, exception is " error)
(when-not (some #(= "amb" (:name %)) (json/parse-string body true))
(do (println "AMB not found...inserting collection")
(insert-collection)))))))
;; Utility function to insert a document into Typesense
(defn insert-to-typesense [collection event]
(let [url (str typesense-url "/collections/" collection "/documents?dirty_values=drop&action=upsert")
doc (nostr/convert-30142-to-nostr-amb event)
; _ (println "converted doc " doc)
]
(http/post url
{:body (json/encode doc)
:headers {"X-TYPESENSE-API-KEY" typesense-api-key
"Content-Type" "application/json"}}
(fn [{:keys [status headers body error opts]}]
(if error
(println "Failed, error is: ")
(println "Status " status "ID:" (:id doc)))))))
(defrecord Typesense []
component/Lifecycle
(start [component]
(println ";; Starting Typesense Connection")
(assoc component :typesense typesense-url)
(check-for-schema))
(stop [component]
(println ";; Stopping Typesense")
(assoc component :typesense nil)))
(defn new-typesense-component []
(map->Typesense {}))

View file

@ -0,0 +1,83 @@
(ns typesense-indexer.components.websocket
(:require [hato.websocket :as ws]
[com.stuartsierra.component :as component]
[cheshire.core :as json]
[typesense-indexer.components.typesense :as typesense])
(:import [java.nio CharBuffer]))
(defn init-request-30142 [wc last-event newest-event reason]
(case reason
"eose" (do
(ws/send! wc (json/generate-string ["CLOSE" "RAND"]))
(ws/send! wc (json/generate-string ["REQ" "RANDNEW" {:kinds [30142]
:since (:created_at @newest-event)}])))
"reconnect" (ws/send! wc (json/generate-string ["REQ" "RAND" {:kinds [1]
:limit 4
:until (:created_at @last-event)}]))
"init" (ws/send! wc (json/generate-string ["REQ" "RAND" {:kinds [30142]
:limit 6000}]))))
(defn on-message-handler [last-parsed-event newest-event ws msg last?]
(let [msg-str (if (instance? CharBuffer msg)
(str msg)
msg)
parsed (json/parse-string msg-str true)
event (nth parsed 2 nil)]
(println (first parsed) (:id event))
(when (and (= "EOSE" (first parsed))
(not= (:created_at @last-parsed-event) (:created_at @newest-event)))
(init-request-30142 ws last-parsed-event newest-event "eose"))
(when (> (:created_at event) (or (:created_at @newest-event) 0))
(reset! newest-event event))
(when (= "EVENT" (first parsed))
(do
(typesense/insert-to-typesense "amb" event)
(reset! last-parsed-event event)))))
(defn create-websocket [url on-close-handler last-parsed-event newest-event]
@(ws/websocket url
{:on-open (fn [ws]
(println "Opened connection to url " url))
:on-message (fn [ws msg last?]
(on-message-handler last-parsed-event newest-event ws msg last?))
:on-close (fn [ws status reason]
;; status 1000 für ws/close!, status 1006 bei connection lost
(println "closed connection. status: " status " reason " reason)
(on-close-handler status))}))
(defrecord WebsocketConnection [url connection]
component/Lifecycle
(start [component]
(println ";; Starting WebsocketConnection")
(let [last-parsed-event (atom {:created_at nil})
newest-event (atom nil)
reconnect (fn reconnect [status]
(println "Lost connection, attempting reconnect for: " url)
(when (= 1006 status)
(try
(Thread/sleep 3000)
(when-not (:connection component)
(let [wc (create-websocket url reconnect last-parsed-event newest-event)]
(assoc component :connection wc)
(init-request-30142 wc last-parsed-event newest-event "reconnect")))
(catch Exception e
(println "Reconnect failed for " url ":" (.getMessage e))
(reconnect status)))))]
(try
(let [wc (create-websocket url reconnect last-parsed-event newest-event)]
(assoc component :connection wc)
(init-request-30142 wc last-parsed-event newest-event "init"))
(catch Exception e
(println "Failed to start connection for " url)
(reconnect 1006)))))
(stop [component]
(println ";; Stopping WebsocketConnection for url " url)
(when-let [wc (:connection component)]
(ws/close! wc))
(assoc component :connection nil)))
(defn new-websocket-connection [url]
(map->WebsocketConnection {:url url}))

View file

@ -0,0 +1,14 @@
(ns typesense-indexer.core
(:require
[typesense-indexer.system :as system]
[com.stuartsierra.component :as component]
))
(defn main []
(component/start (system/system))
)
(comment
(component/stop (system/system))
)

View file

@ -0,0 +1,10 @@
(ns typesense-indexer.system
(:require [com.stuartsierra.component :as component]
[typesense-indexer.components.websocket :as ws]
[typesense-indexer.components.typesense :as typesense]))
(defn system []
(component/system-map
:typesense (typesense/new-typesense-component)
:websocket (component/using (ws/new-websocket-connection "ws://localhost:7778")
[:typesense])))