Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incorrect Behavior: Timeout Error Returned in Rows Instead of as an Error #170

Open
Delgus opened this issue Jul 20, 2023 · 3 comments
Open

Comments

@Delgus
Copy link

Delgus commented Jul 20, 2023

Problem Description

When working with the rows.Next() method in Go and a timeout error occurs, the error is passed through Rows instead of being returned as a usual error. This behavior is non-intuitive and leads to more complex and tangled error handling.

Screenshots/Code
Here's the code example where I've encountered this problem:

package main

import (
	"context"
	"database/sql"
	"fmt"
	"strings"

	"github.com/pkg/errors"
)

type Repo struct {
	dbConn *sql.DB
	schema string
}

type QueryStatistic struct {
	Query  string
	Weight int32
}

func (r *Repo) GetStatistic(ctx context.Context) (<-chan QueryStatistic, <-chan error) {
	stats := make(chan QueryStatistic)
	errs := make(chan error, 1)

	go func() {
		defer close(errs)
		defer close(stats)
		query := "select query,weight from my_table"
		rows, err := r.dbConn.QueryContext(ctx, query)
		if err != nil {
			errs <- fmt.Errorf("Repo.GetStatistic: query: %v err: %w", query, err)
			return
		}

		for rows.Next() {
			var stat QueryStatistic
			// error by timeout not return as error
			if err := rows.Scan(&stat.Query, &stat.Weight); err != nil {
				errs <- fmt.Errorf("scanStatistic: %w", err)
				return
			}
			
			// duct-tape solution
			if strings.Contains(stat.Query, "DB::Exception") {
				// In Query:
				// Code: 159. DB::Exception: Timeout exceeded: elapsed 1836.95185707 seconds, maximum: 1800. (TIMEOUT_EXCEEDED) (version 22.8.19.10 (official build))
				errs <- errors.New(stat.Query)
				return
			}

			stats <- stat
		}

		if err := rows.Err(); err != nil {
			errs <- fmt.Errorf("Repo.GetStatistic: rows.Err: %w", err)
			return
		}
	}()

	return stats, errs
}

Additional Information
If this is the intended behavior, please point me to the documentation that confirms it. It would also be helpful to get information on how to handle such errors without having to use "duct-tape" solutions.

@DoubleDi
Copy link
Collaborator

Hi @Delgus

that's strange, We are parsing the error test into an error struct here

What versions of clickhouse and library are you using?

@Delgus
Copy link
Author

Delgus commented Jul 23, 2023

ClickHouse may respond with a status code of 200 even if an error occurs during the request. When dealing with large result sets, ClickHouse doesn't deliver the entire response body at once but rather in chunks or packets. As a consequence, it becomes difficult to determine the timeout in runtime, and it may be exceeded during the execution process.
example:
docker-compose.yaml

version: "3"

services:        
  clickhouse:
    container_name: clickhouse
    image: clickhouse/clickhouse-server:22.8.19.10-alpine
    ports:
      - "8123:8123"
      - "9000:9000"
      - "9009:9009"

file where we read rows in channel

package clickhouse

import (
	"context"
	"database/sql"
	"fmt"
)

type Repo struct {
	dbConn *sql.DB
	schema string
}

func NewRepo(dbConn *sql.DB, schema string) *Repo {
	return &Repo{
		dbConn: dbConn,
		schema: schema,
	}
}

type Entity struct {
	Query string
	Sku   int32
}

func (r *Repo) GetStatistic(ctx context.Context) (<-chan Entity, <-chan error) {
	stats := make(chan Entity)
	errs := make(chan error, 1)

	go func() {
		defer close(errs)
		defer close(stats)
		rows, err := r.dbConn.QueryContext(ctx, "select * from query_sku_add;")
		if err != nil {
			errs <- fmt.Errorf("Repo.GetStatistic: err: %w", err)
			return
		}

		defer rows.Close()
		for rows.Next() {
			var stat Entity
			if err := rows.Scan(&stat.Query, &stat.Sku); err != nil {
				errs <- fmt.Errorf("scanStatistic: %w", err)
			}

			select {
			case <-ctx.Done():
				return
			case stats <- stat:
			}
		}

		if err := rows.Err(); err != nil {
			errs <- fmt.Errorf("Repo.GetStatistic: rows.Err: %w", err)
			return
		}
	}()

	return stats, errs
}

integration test

//go:build integration
// +build integration

package clickhouse

import (
	"context"
	"database/sql"
	"fmt"
	"strconv"
	"strings"
	"testing"
	"time"

	_ "github.com/mailru/go-clickhouse/v2"
	"github.com/stretchr/testify/require"
	"github.com/stretchr/testify/suite"
)

const testRowsCount = 2_000_000

func TestStatisticRepo(t *testing.T) {
	suite.Run(t, &ClickhouseSuite{})
}

type ClickhouseSuite struct {
	suite.Suite
	clickhouse     *sql.DB
	ctx            context.Context
	readClickhouse *sql.DB
}

func newClickhouse(t *testing.T) *sql.DB {
	t.Helper()
	dsn := "http://localhost:8123?max_execution_time=100"

	conn, err := sql.Open("chhttp", dsn)
	if err != nil {
		t.Fatalf("conn err:%v", err.Error())
	}

	err = conn.Ping()
	if err != nil {
		t.Fatalf("ping err:%v", err.Error())
	}

	return conn
}

func newReadClickhouse(t *testing.T) *sql.DB {
	t.Helper()
	dsn := "http://localhost:8123?max_execution_time=1"

	conn, err := sql.Open("chhttp", dsn)
	if err != nil {
		t.Fatalf("conn err:%v", err.Error())
	}

	err = conn.Ping()
	if err != nil {
		t.Fatalf("ping err:%v", err.Error())
	}

	return conn
}

func (suite *ClickhouseSuite) SetupSuite() {
	suite.ctx = context.Background()
	suite.clickhouse = newClickhouse(suite.T())
	suite.readClickhouse = newReadClickhouse(suite.T())
	for _, query := range []string{
		"drop table if exists query_sku_add;",
		`create table query_sku_add
			(
    			query               String,
    			sku                 Int32
			)
    		engine = MergeTree()
        	ORDER BY query
        	SETTINGS index_granularity = 8192;`,
		bulkInsert(testRowsCount),
	} {
		_, err := suite.clickhouse.ExecContext(suite.ctx, query)
		require.NoError(suite.T(), err)
	}
}

func (suite *ClickhouseSuite) TestGetStatistic() {
	// arrange
	require := suite.Require()
	ctx, cancel := context.WithTimeout(suite.ctx, time.Minute*10)
	defer cancel()

	r := NewRepo(suite.readClickhouse, "")
	var actualStats []Entity

	// act
	in, errs := r.GetStatistic(ctx)
	go func() {
		for i := range in {
			actualStats = append(actualStats, i)
		}
	}()

	var err error
	select {
	case <-ctx.Done():
		err = ctx.Err()
	case err = <-errs:
		cancel()
	}

	require.NoError(err)
	fmt.Println(actualStats[len(actualStats)-1].Query) // error in last row
	require.Equal(testRowsCount, len(actualStats))
}

func bulkInsert(rowsCount int) string {
	var values strings.Builder

	for i := 0; i < rowsCount; i++ {
		values.WriteString(fmt.Sprintf("('%s', %d),", strconv.Itoa(i), i))
	}

	valuesStr := values.String()
	valuesStr = valuesStr[:len(valuesStr)-1]

	query := fmt.Sprintf("INSERT INTO query_sku_add(*) VALUES %s;", valuesStr)

	return query
}

I think it's necessary to add a check here - https://github.com/mailru/go-clickhouse/blob/master/rows.go#L79

@Delgus
Copy link
Author

Delgus commented Jul 23, 2023

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants