From 8409c811d70af382e236c5b84820fdfd2f35054a Mon Sep 17 00:00:00 2001 From: "@s.roertgen" Date: Mon, 18 Nov 2024 10:19:44 +0100 Subject: [PATCH] Seems to work pretty well --- .gitignore | 30 +++++++ deps.edn | 5 ++ .../components/typesense.clj | 66 +++++++++++++++ .../components/websocket.clj | 83 +++++++++++++++++++ src/typesense_indexer/core.clj | 14 ++++ src/typesense_indexer/system.clj | 10 +++ 6 files changed, 208 insertions(+) create mode 100644 .gitignore create mode 100644 deps.edn create mode 100644 src/typesense_indexer/components/typesense.clj create mode 100644 src/typesense_indexer/components/websocket.clj create mode 100644 src/typesense_indexer/core.clj create mode 100644 src/typesense_indexer/system.clj diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0a04143 --- /dev/null +++ b/.gitignore @@ -0,0 +1,30 @@ +.calva/output-window/ +.calva/repl.calva-repl +.classpath +.clj-kondo/.cache +.cpcache +.eastwood +.factorypath +.hg/ +.hgignore +.java-version +.lein-* +.lsp/.cache +.lsp/sqlite.db +.nrepl-history +.nrepl-port +.portal/vs-code.edn +.project +.rebel_readline_history +.settings +.socket-repl-port +.sw* +.vscode +*.class +*.jar +*.swp +*~ +/checkouts +/classes +/target + diff --git a/deps.edn b/deps.edn new file mode 100644 index 0000000..648fdc5 --- /dev/null +++ b/deps.edn @@ -0,0 +1,5 @@ +{:deps {com.stuartsierra/component {:mvn/version "1.1.0"} + hato/hato {:mvn/version "1.0.0"} + http-kit/http-kit {:mvn/version "2.8.0"} + nostr-clj/nostr-clj {:local/root "/home/laoc/coding/test/nostr-clj"} + cheshire/cheshire {:mvn/version "5.10.0"}}} diff --git a/src/typesense_indexer/components/typesense.clj b/src/typesense_indexer/components/typesense.clj new file mode 100644 index 0000000..96a6595 --- /dev/null +++ b/src/typesense_indexer/components/typesense.clj @@ -0,0 +1,66 @@ +(ns typesense-indexer.components.typesense + (:require [com.stuartsierra.component :as component] + [cheshire.core :as json] + [org.httpkit.client :as http] + [nostr-clj.edufeed :as nostr])) + +;; Typesense Client Configuration +(def typesense-url "http://localhost:8108") +(def typesense-api-key "xyz") + +(def amb-schema {:name "amb" + :enable_nested_fields true + :fields [{:name "name" :type "string"} + {:name "author" :type "string" :optional true} + {:name "creator" :type "object[]" :optional true} + {:name "description" :type "string" :optional true} + {:name "keywords" :type "string[]" :optional true} + {:name "about.id" :type "string[]" :optional true} + {:name "about.prefLabel" :type "object[]" :optional true}]}) + +(defn insert-collection [] + (http/post (str typesense-url "/collections") + {:headers {"X-TYPESENSE-API-KEY" typesense-api-key + "Content-Type" "application/json"} + :body (json/generate-string amb-schema)})) + +(defn check-for-schema [] + (http/get (str typesense-url "/collections") + {:headers {"X-TYPESENSE-API-KEY" typesense-api-key + "Content-Type" "application/json"}} + (fn [{:keys [status headers body error opts]}] + (if error + (println "Failed, exception is " error) + (when-not (some #(= "amb" (:name %)) (json/parse-string body true)) + (do (println "AMB not found...inserting collection") + (insert-collection))))))) + +;; Utility function to insert a document into Typesense +(defn insert-to-typesense [collection event] + (let [url (str typesense-url "/collections/" collection "/documents?dirty_values=drop&action=upsert") + doc (nostr/convert-30142-to-nostr-amb event) + ; _ (println "converted doc " doc) + ] + (http/post url + {:body (json/encode doc) + :headers {"X-TYPESENSE-API-KEY" typesense-api-key + "Content-Type" "application/json"}} + (fn [{:keys [status headers body error opts]}] + (if error + (println "Failed, error is: ") + (println "Status " status "ID:" (:id doc))))))) + +(defrecord Typesense [] + component/Lifecycle + (start [component] + (println ";; Starting Typesense Connection") + (assoc component :typesense typesense-url) + (check-for-schema)) + + (stop [component] + (println ";; Stopping Typesense") + (assoc component :typesense nil))) + +(defn new-typesense-component [] + (map->Typesense {})) + diff --git a/src/typesense_indexer/components/websocket.clj b/src/typesense_indexer/components/websocket.clj new file mode 100644 index 0000000..95e5f06 --- /dev/null +++ b/src/typesense_indexer/components/websocket.clj @@ -0,0 +1,83 @@ +(ns typesense-indexer.components.websocket + (:require [hato.websocket :as ws] + [com.stuartsierra.component :as component] + [cheshire.core :as json] + [typesense-indexer.components.typesense :as typesense]) + (:import [java.nio CharBuffer])) + +(defn init-request-30142 [wc last-event newest-event reason] + (case reason + "eose" (do + (ws/send! wc (json/generate-string ["CLOSE" "RAND"])) + (ws/send! wc (json/generate-string ["REQ" "RANDNEW" {:kinds [30142] + :since (:created_at @newest-event)}]))) + "reconnect" (ws/send! wc (json/generate-string ["REQ" "RAND" {:kinds [1] + :limit 4 + :until (:created_at @last-event)}])) + "init" (ws/send! wc (json/generate-string ["REQ" "RAND" {:kinds [30142] + :limit 6000}])))) + +(defn on-message-handler [last-parsed-event newest-event ws msg last?] + (let [msg-str (if (instance? CharBuffer msg) + (str msg) + msg) + parsed (json/parse-string msg-str true) + event (nth parsed 2 nil)] + (println (first parsed) (:id event)) + (when (and (= "EOSE" (first parsed)) + (not= (:created_at @last-parsed-event) (:created_at @newest-event))) + (init-request-30142 ws last-parsed-event newest-event "eose")) + (when (> (:created_at event) (or (:created_at @newest-event) 0)) + (reset! newest-event event)) + (when (= "EVENT" (first parsed)) + (do + (typesense/insert-to-typesense "amb" event) + (reset! last-parsed-event event))))) + +(defn create-websocket [url on-close-handler last-parsed-event newest-event] + @(ws/websocket url + {:on-open (fn [ws] + (println "Opened connection to url " url)) + :on-message (fn [ws msg last?] + (on-message-handler last-parsed-event newest-event ws msg last?)) + :on-close (fn [ws status reason] + ;; status 1000 für ws/close!, status 1006 bei connection lost + (println "closed connection. status: " status " reason " reason) + (on-close-handler status))})) + +(defrecord WebsocketConnection [url connection] + component/Lifecycle + (start [component] + (println ";; Starting WebsocketConnection") + (let [last-parsed-event (atom {:created_at nil}) + newest-event (atom nil) + reconnect (fn reconnect [status] + (println "Lost connection, attempting reconnect for: " url) + (when (= 1006 status) + (try + (Thread/sleep 3000) + (when-not (:connection component) + (let [wc (create-websocket url reconnect last-parsed-event newest-event)] + (assoc component :connection wc) + (init-request-30142 wc last-parsed-event newest-event "reconnect"))) + (catch Exception e + (println "Reconnect failed for " url ":" (.getMessage e)) + (reconnect status)))))] + (try + (let [wc (create-websocket url reconnect last-parsed-event newest-event)] + (assoc component :connection wc) + (init-request-30142 wc last-parsed-event newest-event "init")) + + (catch Exception e + (println "Failed to start connection for " url) + (reconnect 1006))))) + + (stop [component] + (println ";; Stopping WebsocketConnection for url " url) + (when-let [wc (:connection component)] + (ws/close! wc)) + (assoc component :connection nil))) + +(defn new-websocket-connection [url] + (map->WebsocketConnection {:url url})) + diff --git a/src/typesense_indexer/core.clj b/src/typesense_indexer/core.clj new file mode 100644 index 0000000..d662a82 --- /dev/null +++ b/src/typesense_indexer/core.clj @@ -0,0 +1,14 @@ +(ns typesense-indexer.core + (:require + [typesense-indexer.system :as system] + [com.stuartsierra.component :as component] + )) + +(defn main [] + (component/start (system/system)) + ) + + +(comment + (component/stop (system/system)) + ) diff --git a/src/typesense_indexer/system.clj b/src/typesense_indexer/system.clj new file mode 100644 index 0000000..a346846 --- /dev/null +++ b/src/typesense_indexer/system.clj @@ -0,0 +1,10 @@ +(ns typesense-indexer.system + (:require [com.stuartsierra.component :as component] + [typesense-indexer.components.websocket :as ws] + [typesense-indexer.components.typesense :as typesense])) + +(defn system [] + (component/system-map + :typesense (typesense/new-typesense-component) + :websocket (component/using (ws/new-websocket-connection "ws://localhost:7778") + [:typesense])))