diff --git a/resources/config.edn b/resources/config.edn index c3be771..6da8e92 100644 --- a/resources/config.edn +++ b/resources/config.edn @@ -1,3 +1,3 @@ {:websocket {:url "ws://localhost:7778"} :typesense {:url "http://localhost:8108" - :apikey "xyz"}} + :api-key "xyz"}} diff --git a/src/typesense_indexer/components/typesense.clj b/src/typesense_indexer/components/typesense.clj index 53404c3..1b97d6b 100644 --- a/src/typesense_indexer/components/typesense.clj +++ b/src/typesense_indexer/components/typesense.clj @@ -4,10 +4,6 @@ [org.httpkit.client :as http] [nostr.edufeed :as nostr])) -;; Typesense Client Configuration -(def typesense-url "http://localhost:8108") -(def typesense-api-key "xyz") - (def amb-schema {:name "amb" :enable_nested_fields true :fields [{:name "name" :type "string"} @@ -20,47 +16,50 @@ {:name "learningResourceType.id" :type "string[]" :optional true} {:name "learningResourceType.prefLabel" :type "object[]" :optional true}]}) -(defn insert-collection [] - (http/post (str typesense-url "/collections") - {:headers {"X-TYPESENSE-API-KEY" typesense-api-key +(defn insert-collection [url api-key] + (http/post (str url "/collections") + {:headers {"X-TYPESENSE-API-KEY" api-key "Content-Type" "application/json"} :body (json/generate-string amb-schema)})) -(defn check-for-schema [] - (http/get (str typesense-url "/collections") - {:headers {"X-TYPESENSE-API-KEY" typesense-api-key +(defn check-for-schema [url api-key] + (http/get (str url "/collections") + {:headers {"X-TYPESENSE-API-KEY" api-key "Content-Type" "application/json"}} (fn [{:keys [status headers body error opts]}] (if error (println "Failed, exception is " error) (when-not (some #(= "amb" (:name %)) (json/parse-string body true)) (do (println "AMB not found...inserting collection") - (insert-collection))))))) + (insert-collection url api-key))))))) ;; Utility function to insert a document into Typesense -(defn insert-to-typesense [collection event] - (let [url (str typesense-url "/collections/" collection "/documents?dirty_values=drop&action=upsert") +(defn insert-to-typesense [collection event url api-key] + (let [url (str url "/collections/" collection "/documents?dirty_values=drop&action=upsert") doc (nostr/convert-30142-to-nostr-amb event false)] (http/post url {:body (json/encode doc) - :headers {"X-TYPESENSE-API-KEY" typesense-api-key + :headers {"X-TYPESENSE-API-KEY" api-key "Content-Type" "application/json"}} (fn [{:keys [status headers body error opts]}] (if error - (println "Failed, error is: ") + (println "Failed, error is: " error) (println "Status " status "ID:" (:id doc))))))) -(defrecord Typesense [] +(defrecord Typesense [config] component/Lifecycle (start [component] (println ";; Starting Typesense Connection") - (assoc component :typesense typesense-url) - (check-for-schema)) + (println ";; config " config) + (check-for-schema (:url config) (:api-key config)) + (assoc component + :url (:url config) + :api-key (:api-key config))) (stop [component] (println ";; Stopping Typesense") (assoc component :typesense nil))) -(defn new-typesense-component [] - (map->Typesense {})) +(defn new-typesense-component [config] + (map->Typesense {:config config})) diff --git a/src/typesense_indexer/components/websocket.clj b/src/typesense_indexer/components/websocket.clj index 0aa45ec..d78abb8 100644 --- a/src/typesense_indexer/components/websocket.clj +++ b/src/typesense_indexer/components/websocket.clj @@ -1,75 +1,73 @@ (ns typesense-indexer.components.websocket - (:require [hato.websocket :as ws] - [com.stuartsierra.component :as component] - [cheshire.core :as json] + (:require [com.stuartsierra.component :as component] [typesense-indexer.components.typesense :as typesense] - [nostr.core :as nostr]) - (:import [java.nio CharBuffer])) + [clojure.core.async :as async :refer [chan go-loop alts! go >! (:created_at event) (or (:created_at @newest-event) 0)) - (reset! newest-event event)) - (when (= "EVENT" (first parsed)) - (do - (typesense/insert-to-typesense "amb" event) - (reset! last-parsed-event event))))) +(defn on-message-handler [ws parsed last-parsed-event newest-event typesense] + (try + (let [event (nth parsed 2 parsed)] + (when (and (= "EOSE" (first parsed)) + (not= (:created_at @last-parsed-event) (:created_at @newest-event))) + (init-request-30142 ws last-parsed-event newest-event "eose")) + (when (> (get event :created_at 0) (get @newest-event :created_at 0)) + (reset! newest-event event)) + (when (= "EVENT" (first parsed)) + (do + (typesense/insert-to-typesense "amb" event (:url typesense) (:api-key typesense)) + (reset! last-parsed-event event)))) + (catch Exception e + (println "error parsing event" parsed + "\n Error: " + e)))) -(defn create-websocket [url on-close-handler last-parsed-event newest-event] - (nostr/connect url - {:on-open-handler (fn [ws] (println "Opened connection to url " url)) - :on-message-handler (fn [ws msg] (on-message-handler ws msg last-parsed-event newest-event)) - :on-close-handler (fn [ws status reason] (on-close-handler status))})) - -(defrecord WebsocketConnection [url connection] +(defrecord WebsocketConnection [config + ws channel shutdown + typesense] component/Lifecycle (start [component] (println ";; Starting WebsocketConnection") + (println ";; typesense" typesense) (let [last-parsed-event (atom {:created_at nil}) - newest-event (atom nil) - reconnect (fn reconnect [status] - (println "Lost connection, attempting reconnect for: " url) - (when (= 1006 status) - (try - (Thread/sleep 3000) - (when-not (:connection component) - (let [wc (create-websocket url reconnect last-parsed-event newest-event)] - (assoc component :connection wc) - (init-request-30142 wc last-parsed-event newest-event "reconnect"))) - (catch Exception e - (println "Reconnect failed for " url ":" (.getMessage e)) - (reconnect status)))))] + newest-event (atom nil)] (try - (let [wc (create-websocket url reconnect last-parsed-event newest-event)] - (init-request-30142 wc last-parsed-event newest-event "init") - (assoc component :connection wc)) + (let [{:keys [ws channel]} (nostr/connect-channel (:url config))] + (init-request-30142 ws last-parsed-event newest-event "init") + (go-loop [] + (when-some [message (WebsocketConnection {:url url})) +(defn new-websocket-connection [config] + (component/using (map->WebsocketConnection {:config config }) [:typesense])) diff --git a/src/typesense_indexer/config.clj b/src/typesense_indexer/config.clj new file mode 100644 index 0000000..4ad4893 --- /dev/null +++ b/src/typesense_indexer/config.clj @@ -0,0 +1,9 @@ +(ns typesense-indexer.config + (:require [aero.core :as aero] + [clojure.java.io :as io])) + +(defn read-config + [] + (-> "config.edn" + (io/resource) + (aero/read-config))) diff --git a/src/typesense_indexer/core.clj b/src/typesense_indexer/core.clj deleted file mode 100644 index 1c0476f..0000000 --- a/src/typesense_indexer/core.clj +++ /dev/null @@ -1,14 +0,0 @@ -(ns typesense-indexer.core - (:require - [typesense-indexer.system :as system] - [com.stuartsierra.component :as component] - )) - -(defn -main [] - (component/start (system/new-system)) - ) - - -(comment - (component/stop (system/new-system)) - ) diff --git a/src/typesense_indexer/system.clj b/src/typesense_indexer/system.clj index dcba1fa..3c80101 100644 --- a/src/typesense_indexer/system.clj +++ b/src/typesense_indexer/system.clj @@ -1,21 +1,35 @@ (ns typesense-indexer.system (:require [com.stuartsierra.component :as component] + [typesense-indexer.config :as config] [typesense-indexer.components.websocket :as ws] [typesense-indexer.components.typesense :as typesense])) -(defn new-system-map [] +(defn new-system-map [config] (component/system-map - :typesense (typesense/new-typesense-component) - :websocket (component/using (ws/new-websocket-connection "ws://localhost:7778") - [:typesense]))) + :typesense (typesense/new-typesense-component (:typesense config)) + :websocket (ws/new-websocket-connection (:websocket config)))) + +(defn configure [system] + (let [config (config/read-config)] + (merge-with merge system config))) (defn new-dependency-map [] {}) -(defn new-system - "Create the production system" - [] - (println ";; Setting up new system") - (-> (new-system-map) - (component/system-using (new-dependency-map)))) - +(defn -main [] + (let [system (-> (config/read-config) + (new-system-map) + (component/start-system)) + shutdown (-> system :websocket :shutdown)] + ;; Add the shutdown hook before blocking + (.addShutdownHook + (Runtime/getRuntime) + (Thread. + #(do + (println ";; Shutdown hook triggered") + (component/stop-system system) + (println ";; System stopped")))) + ;; Block on the shutdown promise + (println "System running. Press Ctrl+C to stop.") + @shutdown ;; Blocks until the shutdown promise is delivered + (println ";; Exiting application...")))