Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion cmd/ksef/commands/authorization/logout.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"ksef/cmd/ksef/commands/client"
"ksef/cmd/ksef/flags"
"ksef/internal/client/v2/auth"
kr "ksef/internal/keyring"
"ksef/internal/logging"

"github.com/spf13/cobra"
"github.com/spf13/viper"
Expand All @@ -25,7 +27,14 @@ func logout(cmd *cobra.Command, _ []string) error {
vip := viper.GetViper()
vip.Set(auth.FlagExitAfterPersistingToken, "true")

cli, err := client.InitClient(cmd)
keyring, err := kr.NewKeyring(vip)
if err != nil {
logging.SeiLogger.Error("błąd inicjalizacji keyringu", "err", err)
return err
}
defer keyring.Close()

cli, err := client.InitClient(cmd, vip, keyring)
if err != nil {
return err
}
Expand Down
10 changes: 10 additions & 0 deletions cmd/ksef/commands/authorization/sessions/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"ksef/cmd/ksef/flags"
"ksef/internal/client/v2/auth"
kr "ksef/internal/keyring"
"ksef/internal/logging"

"github.com/alexeyco/simpletable"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -32,10 +34,18 @@ func getSessions(cmd *cobra.Command, _ []string) error {

vip := viper.GetViper()

keyring, err := kr.NewKeyring(vip)
if err != nil {
logging.SeiLogger.Error("błąd inicjalizacji keyringu", "err", err)
return err
}
defer keyring.Close()

tokenManager, err = auth.NewTokenManager(
cmd.Context(),
vip,
nil,
keyring,
)
if err != nil {
return err
Expand Down
11 changes: 10 additions & 1 deletion cmd/ksef/commands/certificates/prepare_csr.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"ksef/cmd/ksef/commands/client"
"ksef/cmd/ksef/flags"
"ksef/internal/certsdb"
kr "ksef/internal/keyring"
"ksef/internal/logging"
"ksef/internal/runtime"

Expand Down Expand Up @@ -38,7 +39,15 @@ func init() {
func sendCsrs(cmd *cobra.Command, _ []string) error {
envId := runtime.GetEnvironmentId(viper.GetViper())
nip, _ := cmd.Flags().GetString(flags.FlagNameNIP)
if cli, err = client.InitClient(cmd); err != nil {

keyring, err := kr.NewKeyring(viper.GetViper())
if err != nil {
logging.SeiLogger.Error("unable to initialize keyring", "err", err)
return err
}
defer keyring.Close()

if cli, err = client.InitClient(cmd, viper.GetViper(), keyring); err != nil {
return err
}
certsManager, err := cli.Certificates(envId)
Expand Down
11 changes: 10 additions & 1 deletion cmd/ksef/commands/certificates/push_csr.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package certificates
import (
"ksef/cmd/ksef/commands/client"
"ksef/cmd/ksef/flags"
kr "ksef/internal/keyring"
"ksef/internal/logging"
"ksef/internal/runtime"

"github.com/spf13/cobra"
Expand All @@ -23,7 +25,14 @@ func init() {
}

func syncEnrollments(cmd *cobra.Command, _ []string) error {
if cli, err = client.InitClient(cmd); err != nil {
keyring, err := kr.NewKeyring(viper.GetViper())
if err != nil {
logging.SeiLogger.Error("błąd inicjalizacji keyringu", "err", err)
return err
}
defer keyring.Close()

if cli, err = client.InitClient(cmd, viper.GetViper(), keyring); err != nil {
return err
}
certsManager, err := cli.Certificates(runtime.GetEnvironmentId(viper.GetViper()))
Expand Down
5 changes: 3 additions & 2 deletions cmd/ksef/commands/client/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"ksef/internal/certsdb"
v2 "ksef/internal/client/v2"
"ksef/internal/client/v2/auth/token"
kr "ksef/internal/keyring"

"github.com/spf13/cobra"
"github.com/spf13/viper"
)

func InitClient(cmd *cobra.Command, initializers ...v2.InitializerFunc) (*v2.APIClient, error) {
vip := viper.GetViper()
func InitClient(cmd *cobra.Command, vip *viper.Viper, keyring kr.Keyring, initializers ...v2.InitializerFunc) (*v2.APIClient, error) {
var err error

var cli *v2.APIClient
Expand All @@ -25,6 +25,7 @@ func InitClient(cmd *cobra.Command, initializers ...v2.InitializerFunc) (*v2.API
}

clientInitializers := []v2.InitializerFunc{
v2.WithKeyring(keyring),
v2.WithAuthValidator(
token.NewAuthHandler(
vip,
Expand Down
30 changes: 29 additions & 1 deletion cmd/ksef/commands/invoices/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,19 @@ import (
"ksef/cmd/ksef/flags"
"ksef/internal/invoicesdb"
downloaderconfig "ksef/internal/invoicesdb/downloader/config"
kr "ksef/internal/keyring"
"ksef/internal/logging"
"ksef/internal/runtime"

"github.com/spf13/cobra"
"github.com/spf13/viper"
)

const (
flagNameWorkersLong = "workers"
flagNameWorkersShort = "w"
)

var downloadCommand = &cobra.Command{
Use: "download",
Short: "pobierz nowe faktury z KSeF",
Expand All @@ -22,12 +29,17 @@ func init() {
flags.NIP(flagSet)
downloaderconfig.DownloaderFlags(flagSet, "")
runtime.CertProfileFlag(flagSet)
flagSet.IntP(flagNameWorkersLong, flagNameWorkersShort, 0, "Ilość workerów (domyślnie 0; wartość > 0 oznacza ilość współbieżnych wątków pobierających faktury dla wszystkich zarejestrowanych numerów NIP)")

InvoicesCommand.AddCommand(downloadCommand)
}

func downloadRun(cmd *cobra.Command, _ []string) error {
vip := viper.GetViper()
workers := vip.GetInt(flagNameWorkersLong)
if workers > 0 {
return downloadRunParalell(cmd, vip, workers)
}
if err := runtime.CheckNIPIsSet(vip); err != nil {
return err
}
Expand All @@ -37,7 +49,14 @@ func downloadRun(cmd *cobra.Command, _ []string) error {
return err
}

ksefClient, err := client.InitClient(cmd)
keyring, err := kr.NewKeyring(vip)
if err != nil {
logging.SeiLogger.Error("błąd inicjalizacji keyringu", "err", err)
return err
}
defer keyring.Close()

ksefClient, err := client.InitClient(cmd, vip, keyring)
if err != nil {
return err
}
Expand All @@ -55,3 +74,12 @@ func downloadRun(cmd *cobra.Command, _ []string) error {

return invoicesDB.DownloadInvoices(cmd.Context(), vip, downloaderConfig)
}

func cloneViper(src *viper.Viper) *viper.Viper {
newViper := viper.New()
for _, key := range src.AllKeys() {
newViper.Set(key, src.Get(key))
}

return newViper
}
139 changes: 139 additions & 0 deletions cmd/ksef/commands/invoices/download_paralell.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package invoices

import (
"ksef/cmd/ksef/commands/client"
"ksef/internal/client/v2/types/invoices"
"ksef/internal/invoicesdb"
downloaderconfig "ksef/internal/invoicesdb/downloader/config"
kr "ksef/internal/keyring"
"ksef/internal/logging"
"ksef/internal/runtime"
"sync"

"github.com/spf13/cobra"
"github.com/spf13/viper"
)

type downloadError struct {
NIP string
err error
}

func downloadRunParalell(cmd *cobra.Command, baseViper *viper.Viper, numWorkers int) error {
// let's collect NIP's
nipNumbers, err := invoicesdb.GetAllNIPs(baseViper)
if err != nil {
return err
}

downloaderConfig, err := downloaderconfig.GetDownloaderConfig(baseViper, "")
if err != nil {
return err
}

keyring, err := kr.NewKeyring(baseViper)
if err != nil {
logging.SeiLogger.Error("błąd inicjalizacji keyringu", "err", err)
return err
}
defer keyring.Close()

// now let's determine if there is less nip's to process than number of
// declared workers. if so - let's decrement it to not waste resources
if len(nipNumbers) < numWorkers {
numWorkers = len(nipNumbers)
}

// start the workers
var wg sync.WaitGroup
wg.Add(numWorkers)
nipChannel := make(chan string, len(nipNumbers))
errChannel := make(chan downloadError)

for range numWorkers {
go downloadWorker(cmd, &wg, baseViper, downloaderConfig, keyring, nipChannel, errChannel)
}

// start the error listening function
var errors []downloadError
go func() {
for err := range errChannel {
errors = append(errors, err)
}
}()

// pass nip numbers to the workers
for _, nip := range nipNumbers {
nipChannel <- nip
}

// now we can close the channel. the worker will detect this as a signal that
// there are no more incoming NIP's and that it can return
close(nipChannel)

// now we can wait for the workers to finish their work
logging.DownloadLogger.Info("Oczekiwanie na zakończenie pobierania faktur")
wg.Wait()
close(errChannel)

if len(errors) > 0 {
logging.DownloadLogger.Info("Podczas pobierania wystąpiły następujące błędy")
for _, err := range errors {
logging.DownloadLogger.Error("Błąd pobierania", "NIP", err.NIP, "error", err)
}
}

return nil
}

func downloadWorker(
cmd *cobra.Command,
wg *sync.WaitGroup,
baseViper *viper.Viper,
downloaderConfig invoices.DownloadParams,
keyring kr.Keyring,
nipChannel <-chan string,
errorsChannel chan<- downloadError,
) {
defer wg.Done()

for nip := range nipChannel {
vip := cloneViper(baseViper)
runtime.SetNIP(vip, nip)

if err := doDownload(cmd, vip, nip, downloaderConfig, keyring); err != nil {
errorsChannel <- downloadError{
NIP: nip,
err: err,
}
}
}
}

// this may seem weird, but it's actually easier to make this function look "almost" natural
// and return the error. This way, the "inner" function can focus on the actual logic of downloading invoices
// whereas the wrapper (i.e. worker in the worker pool) can focus on dealing with error channel and so on.
// from the perspective of this function - there's nothing magical - it just instantiates the KSeFClient
// and uses defer to close it.
func doDownload(
cmd *cobra.Command,
vip *viper.Viper,
nip string,
downloaderConfig invoices.DownloadParams,
keyring kr.Keyring,
) error {
ksefClient, err := client.InitClient(cmd, vip, keyring)
if err != nil {
return err
}
defer ksefClient.Close()

invoicesDB, err := invoicesdb.OpenForNIP(
nip, vip, invoicesdb.WithKSeFClient(ksefClient),
)
if err != nil {
return err
}

return invoicesDB.DownloadInvoices(cmd.Context(), vip, downloaderConfig)
}
12 changes: 11 additions & 1 deletion cmd/ksef/commands/invoices/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
invoicesdbconfig "ksef/internal/invoicesdb/config"
statuscheckerconfig "ksef/internal/invoicesdb/status-checker/config"
uploaderconfig "ksef/internal/invoicesdb/uploader/config"
kr "ksef/internal/keyring"
"ksef/internal/logging"
"ksef/internal/runtime"
inputprocessors "ksef/internal/sei/input_processors"

Expand Down Expand Up @@ -40,7 +42,15 @@ func init() {

func importRun(cmd *cobra.Command, args []string) error {
vip := viper.GetViper()
ksefClient, err := client.InitClient(cmd, v2.WithoutTokenManager())

keyring, err := kr.NewKeyring(vip)
if err != nil {
logging.SeiLogger.Error("błąd inicjalizacji keyringu", "err", err)
return err
}
defer keyring.Close()

ksefClient, err := client.InitClient(cmd, vip, keyring, v2.WithoutTokenManager())
if err != nil {
return errors.Join(errClientInit, err)
}
Expand Down
11 changes: 10 additions & 1 deletion cmd/ksef/commands/invoices/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"ksef/cmd/ksef/flags"
"ksef/internal/invoicesdb"
"ksef/internal/invoicesdb/config"
kr "ksef/internal/keyring"
"ksef/internal/logging"
"ksef/internal/runtime"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -36,7 +38,14 @@ func syncInvoicesRun(cmd *cobra.Command, args []string) error {
return err
}

ksefClient, err := client.InitClient(cmd)
keyring, err := kr.NewKeyring(vip)
if err != nil {
logging.SeiLogger.Error("błąd inicjalizacji keyringu", "err", err)
return err
}
defer keyring.Close()

ksefClient, err := client.InitClient(cmd, vip, keyring)
if err != nil {
return err
}
Expand Down
Loading
Loading