diff --git a/cyclops-ctrl/internal/controller/modules.go b/cyclops-ctrl/internal/controller/modules.go index 23e972af..5945d6d4 100644 --- a/cyclops-ctrl/internal/controller/modules.go +++ b/cyclops-ctrl/internal/controller/modules.go @@ -2,12 +2,14 @@ package controller import ( "fmt" + "io" "net/http" "os" - "sigs.k8s.io/yaml" "strings" "time" + "sigs.k8s.io/yaml" + "github.com/gin-gonic/gin" "github.com/cyclops-ui/cyclops/cyclops-ctrl/api/v1alpha1" @@ -535,24 +537,43 @@ func (m *Modules) GetLogs(ctx *gin.Context) { ctx.Header("Access-Control-Allow-Origin", "*") logCount := int64(100) - rawLogs, err := m.kubernetesClient.GetPodLogs( - ctx.Param("namespace"), - ctx.Param("container"), - ctx.Param("name"), - &logCount, - ) - if err != nil { - fmt.Println(err) - ctx.JSON(http.StatusInternalServerError, dto.NewError("Error fetching logs", err.Error())) - return - } - logs := make([]string, 0, len(rawLogs)) - for _, log := range rawLogs { - logs = append(logs, trimLogLine(log)) - } + logChan := make(chan string) - ctx.JSON(http.StatusOK, logs) + go func() { + defer close(logChan) + + err := m.kubernetesClient.GetStreamedPodLogs( + ctx.Request.Context(), // we will have to pass the context for the k8s podClient - so it can stop the stream when the client disconnects + ctx.Param("namespace"), + ctx.Param("container"), + ctx.Param("name"), + &logCount, + logChan, + ) + if err != nil { + return + } + }() + + // stream logs to the client + ctx.Stream(func(w io.Writer) bool { + for { + select { + case log, ok := <-logChan: + if !ok { + return false + } + + ctx.SSEvent("pod-log", log) + return true + case <-ctx.Request.Context().Done(): + return false + case <-ctx.Done(): + return false + } + } + }) } func (m *Modules) GetDeploymentLogs(ctx *gin.Context) { diff --git a/cyclops-ctrl/internal/handler/handler.go b/cyclops-ctrl/internal/handler/handler.go index a425e9e4..00685011 100644 --- a/cyclops-ctrl/internal/handler/handler.go +++ b/cyclops-ctrl/internal/handler/handler.go @@ -82,7 +82,7 @@ func (h *Handler) Start() error { h.router.GET("/modules/:name/helm-template", modulesController.HelmTemplate) //h.router.POST("/modules/resources", modulesController.ModuleToResources) - h.router.GET("/resources/pods/:namespace/:name/:container/logs", modulesController.GetLogs) + h.router.GET("/resources/pods/:namespace/:name/:container/logs", sse.HeadersMiddleware(), modulesController.GetLogs) h.router.GET("/resources/pods/:namespace/:name/:container/logs/download", modulesController.DownloadLogs) h.router.GET("/manifest", modulesController.GetManifest) diff --git a/cyclops-ctrl/pkg/cluster/k8sclient/client.go b/cyclops-ctrl/pkg/cluster/k8sclient/client.go index 3b378853..16b4079c 100644 --- a/cyclops-ctrl/pkg/cluster/k8sclient/client.go +++ b/cyclops-ctrl/pkg/cluster/k8sclient/client.go @@ -7,15 +7,16 @@ import ( "errors" "fmt" "io" - networkingv1 "k8s.io/api/networking/v1" - "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/apimachinery/pkg/watch" "os" "os/exec" "sort" "strings" "time" + networkingv1 "k8s.io/api/networking/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/watch" + "github.com/cyclops-ui/cyclops/cyclops-ctrl/api/v1alpha1" "gopkg.in/yaml.v2" @@ -169,6 +170,34 @@ func (k *KubernetesClient) GetPods(namespace, name string) ([]apiv1.Pod, error) return podList.Items, err } +func (k *KubernetesClient) GetStreamedPodLogs(ctx context.Context, namespace, container, name string, logCount *int64, logChan chan<- string) error { + podLogOptions := apiv1.PodLogOptions{ + Container: container, + TailLines: logCount, + Timestamps: true, + Follow: true, + } + + podClient := k.clientset.CoreV1().Pods(namespace).GetLogs(name, &podLogOptions) + stream, err := podClient.Stream(ctx) + if err != nil { + return err + } + defer stream.Close() + + scanner := bufio.NewScanner(stream) + + for scanner.Scan() { + logChan <- scanner.Text() + } + + if err := scanner.Err(); err != nil { + return err + } + + return nil +} + func (k *KubernetesClient) GetPodLogs(namespace, container, name string, numLogs *int64) ([]string, error) { podLogOptions := apiv1.PodLogOptions{ Container: container, diff --git a/cyclops-ui/src/components/k8s-resources/common/PodTable/PodLogs.tsx b/cyclops-ui/src/components/k8s-resources/common/PodTable/PodLogs.tsx index cfd34609..3db7a7e3 100644 --- a/cyclops-ui/src/components/k8s-resources/common/PodTable/PodLogs.tsx +++ b/cyclops-ui/src/components/k8s-resources/common/PodTable/PodLogs.tsx @@ -1,16 +1,17 @@ import { ReadOutlined } from "@ant-design/icons"; import { Alert, Button, Col, Divider, Modal, Tabs, TabsProps } from "antd"; -import axios from "axios"; import { useState } from "react"; import ReactAce from "react-ace/lib/ace"; import { mapResponseError } from "../../../../utils/api/errors"; +import { isStreamingEnabled } from "../../../../utils/api/common"; +import { logStream } from "../../../../utils/api/sse/logs"; interface PodLogsProps { pod: any; } const PodLogs = ({ pod }: PodLogsProps) => { - const [logs, setLogs] = useState(""); + const [logs, setLogs] = useState([]); const [logsModal, setLogsModal] = useState({ on: false, namespace: "", @@ -18,6 +19,10 @@ const PodLogs = ({ pod }: PodLogsProps) => { containers: [], initContainers: [], }); + + // + const [, setLogsSignalController] = useState(null); + const [error, setError] = useState({ message: "", description: "", @@ -31,7 +36,14 @@ const PodLogs = ({ pod }: PodLogsProps) => { containers: [], initContainers: [], }); - setLogs(""); + setLogs([]); + setLogsSignalController((prevController) => { + if (prevController) { + prevController.abort(); + } + + return null; + }); }; const getTabItems = () => { @@ -50,7 +62,7 @@ const PodLogs = ({ pod }: PodLogsProps) => { type="primary" // icon={} onClick={downloadLogs(container.name)} - disabled={logs === "No logs available"} + disabled={logs.length === 0} > Download @@ -58,7 +70,9 @@ const PodLogs = ({ pod }: PodLogsProps) => { @@ -78,7 +92,7 @@ const PodLogs = ({ pod }: PodLogsProps) => { type="primary" // icon={} onClick={downloadLogs(container.name)} - disabled={logs === "No logs available"} + disabled={logs.length === 0} > Download @@ -86,7 +100,9 @@ const PodLogs = ({ pod }: PodLogsProps) => { @@ -99,31 +115,32 @@ const PodLogs = ({ pod }: PodLogsProps) => { }; const onLogsTabsChange = (container: string) => { - axios - .get( - "/api/resources/pods/" + - logsModal.namespace + - "/" + - logsModal.pod + - "/" + - container + - "/logs", - ) - .then((res) => { - if (res.data) { - let log = ""; - res.data.forEach((s: string) => { - log += s; - log += "\n"; + const controller = new AbortController(); + setLogsSignalController((prevController) => { + if (prevController) { + prevController.abort(); + } + + return controller; + }); + setLogs(() => []); //this is to remove the previous pod's logs + + if (isStreamingEnabled()) { + logStream( + logsModal.pod, + logsModal.namespace, + container, + (log) => { + setLogs((prevLogs) => { + return [...prevLogs, log]; }); - setLogs(log); - } else { - setLogs("No logs available"); - } - }) - .catch((error) => { - setError(mapResponseError(error)); - }); + }, + (err) => { + setError(mapResponseError(err)); + }, + controller, + ); + } }; const downloadLogs = (container: string) => { @@ -139,45 +156,40 @@ const PodLogs = ({ pod }: PodLogsProps) => { }; }; - const handleViewPodLogs = async () => { - axios - .get( - "/api/resources/pods/" + - pod.namespace + - "/" + - pod.name + - "/" + - pod.containers[0].name + - "/logs", - ) - .then((res) => { - if (res.data) { - let log = ""; - res.data.forEach((s: string) => { - log += s; - log += "\n"; - }); - setLogs(log); - } else { - setLogs("No logs available"); - } - }) - .catch((error) => { - setError(mapResponseError(error)); - }); - - setLogsModal({ - on: true, - namespace: pod.namespace, - pod: pod.name, - containers: pod.containers, - initContainers: pod.initContainers, - }); - }; - return ( <> -