diff --git a/otus-16/src/otus_16/clojure_async.clj b/otus-16/src/otus_16/clojure_async.clj new file mode 100644 index 0000000..81daf7a --- /dev/null +++ b/otus-16/src/otus_16/clojure_async.clj @@ -0,0 +1,52 @@ +(ns otus-16.clojure-async + (:require [clojure.core.async :refer [chan !! >! ! (:file @channels) file))) + + +(defn process-files [] + (go + (loop [] + (when-let [f (! (:lines @channels) l))))) + (recur)))) + +(defn collect-data [reducer] + (clojure.core.async/reduce reducer {} (:lines @channels))) + +(defn solution [] + (let [files (f/files "C:\\Users\\Anton\\IdeaProjects\\Clojure-Developer-2024-10\\otus-16\\static\\apache_logs.txt")] + (reset-ch) + (process-files) + (doseq [f files] (push-file f)) + (reset! result-ch (collect-data (c/create-collectors))))) + + +(comment + + (solution) + (close-ch) + (println (> + (f/files "C:\\Users\\Anton\\IdeaProjects\\Clojure-Developer-2024-10\\otus-16\\static") + (pmap #(process-file reducers % 50)) + (reduce c/merge-result-reducer))] + (println result))) + +(comment + (solution) + + ) diff --git a/otus-16/src/otus_16/collectors.clj b/otus-16/src/otus_16/collectors.clj new file mode 100644 index 0000000..aef5437 --- /dev/null +++ b/otus-16/src/otus_16/collectors.clj @@ -0,0 +1,98 @@ +(ns otus-16.collectors) +(def log-line + "127.0.0.1 - frank [10/Jan/2025:13:55:36 +0000] \"GET /apache_pb.gif HTTP/1.0\" 200 2326 \"http://www.example.com/start.html\" \"Mozilla/4.08 [en] (Win98; I ;Nav)\"") + +(def log-pattern + (re-pattern + (str + ;; IP адрес + "^(\\S+) " ;; %h: IP-адрес + "(\\S+) " ;; %l: идентификатор клиента (обычно "-") + "(\\S+) " ;; %u: имя пользователя (или "-") + "\\[([^\\]]+)\\] " ;; %t: дата/время в квадратных скобках + "\"([^\"]+)\" " ;; %r: HTTP-запрос в кавычках + "(\\d{3}) " ;; %>s: код состояния HTTP + "(\\S+) " ;; %b: размер ответа (или "-") + "\"([^\"]*)\" " ;; %{Referer}i: реферер в кавычках + "\"([^\"]*)\"$"))) ;; %{User-Agent}i: пользовательский агент в кавычках + +(defn parse-log-line-to-map [line] + (when-let [matches (re-matches log-pattern line)] + (zipmap [:ip :client-identity :user :timestamp :request :status :bytes :referer :user-agent] + (rest matches)))) + +(defn parse-log-line [line] + (when-let [matches (re-matches log-pattern line)] + (rest matches))) + +(defn to-int [v] + (try + (Integer/parseInt v) + (catch Exception _ 0))) + +(defn get-bytes [data] + (to-int (nth data 6))) + +(comment + (get-bytes (parse-log-line log-line))) + +(defn get-url [data] + (let [arr (clojure.string/split (nth data 4) #"\s+")] + (nth arr 1))) + + +(defn total-bytes-collector [acc data] + (let [bytes (get-bytes data)] + (update acc :total-bytes (fnil + 0) bytes))) + +(defn bytes-by-url-collector [target-url] + (fn [acc data] + (when (= (get-url data) target-url) + (update-in acc [:bytes-by-url target-url] + (fnil + 0) (get-bytes data))))) + +(defn urls-by-refer-collectors [target-refer] + (fn [acc data] + (when (= (nth data 7) target-refer) + (update-in acc [:urls-by-referrer target-refer] (fnil conj #{}) (get-url data))))) + +(defn transform-urls-by-referrer [data] + (update data :urls-by-referrer + (fn [urls-by-referrer] + (when urls-by-referrer + (into {} + (map (fn [[referrer urls]] + [referrer (count urls)]) + urls-by-referrer)))))) + +(defn combine-collectors [collectors] + (fn + ([] []) + ([acc] (transform-urls-by-referrer acc)) + ([acc data] (reduce (fn [a collector] (collector a data)) acc collectors)))) + +(defn apply-pipeline [collectors lines] + (transduce (map parse-log-line) collectors {} lines)) + +(defn merge-result-reducer + ([] {}) + ([acc m] + (-> acc + (update :total-bytes + (:total-bytes m 0)) + (update :bytes-by-url merge (:bytes-by-url m {})) + (update :urls-by-referrer merge (:urls-by-referrer m {}))))) + +(defn merge-results [maps] + (reduce merge-result-reducer + {:total-bytes 0, :bytes-by-url {}, :urls-by-referrer {}} + (apply concat maps))) + +(defn create-collectors [& {:keys [url referrer]}] + (let [collectors (cond-> [total-bytes-collector] + url (conj (bytes-by-url-collector url)) + referrer (conj (urls-by-refer-collectors referrer)))] + (combine-collectors collectors))) + + + + diff --git a/otus-16/src/otus_16/file_utils.clj b/otus-16/src/otus_16/file_utils.clj new file mode 100644 index 0000000..c63b42b --- /dev/null +++ b/otus-16/src/otus_16/file_utils.clj @@ -0,0 +1,15 @@ +(ns otus-16.file-utils + (:require [clojure.java.io :as io])) + + +(defn files [^String folder] + (filter #(not (.isDirectory %)) (file-seq (io/file folder)))) + +(defn with-folder [^String folder fn] + (fn (files folder))) + +(defmacro with-file [file binding & body] + `(with-open [rdr# (clojure.java.io/reader ~file)] + (let [~binding (line-seq rdr#)] + ~@body))) + diff --git a/otus-16/src/otus_16/java_threads.clj b/otus-16/src/otus_16/java_threads.clj new file mode 100644 index 0000000..bfe8cee --- /dev/null +++ b/otus-16/src/otus_16/java_threads.clj @@ -0,0 +1,75 @@ +(ns otus-16.java-threads + (:require [otus-16.file-utils :as f] + [otus-16.collectors :as c]) + (:import (java.util.concurrent Executors Future))) + + +(defonce executor-atom (atom nil)) + +(defn init-executor! [threads] + (reset! executor-atom (Executors/newFixedThreadPool ^Integer threads))) + +(defn shutdown-executor! [] + (when-let [exec @executor-atom] + (.shutdown exec) + (reset! executor-atom nil))) + +(defmacro submit-task [& body] + `(let [task# (reify Callable + (call [_] ~@body))] + (.submit @executor-atom task#))) + +(defn process-lines-task [reducer lines] + (submit-task (c/apply-pipeline reducer lines))) + +(defn process-file-task [reducer lines batch-size] + (submit-task + (let [chunks (partition-all batch-size lines) + results (map #(process-lines-task reducer %) chunks)] + (mapv #(.get ^Future %) results)))) + +(defn process-file [reducer file batch-size] + (f/with-file file lines + (let [^Future f (process-file-task reducer lines batch-size) + result (.get f)] + result))) + +(defn solution [& {:keys [url referrer] + :or {}}] + (init-executor! 16) ;; Initialize the global executor with 16 threads + (try + (let [reducers (c/create-collectors {:url url :referrer referrer}) + files (f/files "C:\\Users\\Anton\\IdeaProjects\\Clojure-Developer-2024-10\\otus-16\\static") + results (map #(process-file reducers % 50) files) + result (c/merge-results results)] + (println result)) + (finally + (shutdown-executor!)))) ;; Ensure the executor is shut down when done + + +(comment + + (solution) + + (with-executor 8 exec + (f/with-folder "C:\\Users\\Anton\\Downloads\\Telegram Desktop\\logs" + (fn [files] + (let [futures (map #(submit-task exec (.getName %)) files) + results (map #(.get %) futures)] + (doseq [r results] + (println r)))))) + + (with-executor 8 exec + (let [files (f/files "C:\\Users\\Anton\\Downloads\\Telegram Desktop\\logs")] + (let [futures (map #(submit-task exec (.getName %)) files) + results (map #(.get %) futures)] + (doseq [r results] + (println r))))) + + (with-executor 16 exec + (let [reducers (c/combine-collectors [c/total-bytes-collector]) + files (f/files "C:\\Users\\Anton\\IdeaProjects\\Clojure-Developer-2024-10\\otus-16\\static") + results (map #(process-file exec reducers % 50) files) + result (c/merge-results results)] + (println result))) + )