@@ -68,6 +68,35 @@ static void clear_output_num()
6868 set_output_num (0 );
6969}
7070
71+ struct test_log_verifier {
72+ const char * expected ;
73+ size_t expected_len ;
74+ int records ;
75+ int valid_matches ;
76+ };
77+
78+ static void reset_log_verifier (struct test_log_verifier * verifier ,
79+ const char * expected ,
80+ size_t expected_len )
81+ {
82+ pthread_mutex_lock (& result_mutex );
83+ verifier -> expected = expected ;
84+ verifier -> expected_len = expected_len ;
85+ verifier -> records = 0 ;
86+ verifier -> valid_matches = 0 ;
87+ pthread_mutex_unlock (& result_mutex );
88+ }
89+
90+ static void get_log_verifier (struct test_log_verifier * verifier ,
91+ int * records ,
92+ int * valid_matches )
93+ {
94+ pthread_mutex_lock (& result_mutex );
95+ * records = verifier -> records ;
96+ * valid_matches = verifier -> valid_matches ;
97+ pthread_mutex_unlock (& result_mutex );
98+ }
99+
71100static int cb_count_msgpack (void * record , size_t size , void * data )
72101{
73102 msgpack_unpacked result ;
@@ -77,6 +106,17 @@ static int cb_count_msgpack(void *record, size_t size, void *data)
77106 flb_error ("data is NULL" );
78107 }
79108
109+ if (!TEST_CHECK (record != NULL )) {
110+ flb_error ("record is NULL" );
111+ return -1 ;
112+ }
113+
114+ if (!TEST_CHECK (size > 0 )) {
115+ flb_error ("record size is zero" );
116+ flb_free (record );
117+ return -1 ;
118+ }
119+
80120 /* Iterate each item array and apply rules */
81121 msgpack_unpacked_init (& result );
82122 while (msgpack_unpack_next (& result , record , size , & off ) == MSGPACK_UNPACK_SUCCESS ) {
@@ -90,6 +130,91 @@ static int cb_count_msgpack(void *record, size_t size, void *data)
90130 return 0 ;
91131}
92132
133+ static int cb_check_large_record_msgpack (void * record , size_t size , void * data )
134+ {
135+ msgpack_unpacked result ;
136+ msgpack_object root ;
137+ msgpack_object * map ;
138+ msgpack_object_kv * kv ;
139+ size_t off = 0 ;
140+ int i ;
141+ struct test_log_verifier * verifier = data ;
142+
143+ if (!TEST_CHECK (verifier != NULL )) {
144+ flb_error ("verifier is NULL" );
145+ if (record != NULL ) {
146+ flb_free (record );
147+ }
148+ return -1 ;
149+ }
150+
151+ if (!TEST_CHECK (record != NULL )) {
152+ flb_error ("record is NULL" );
153+ return -1 ;
154+ }
155+
156+ if (!TEST_CHECK (size > 0 )) {
157+ flb_error ("record size is zero" );
158+ flb_free (record );
159+ return -1 ;
160+ }
161+
162+ msgpack_unpacked_init (& result );
163+
164+ while (msgpack_unpack_next (& result , record , size , & off ) == MSGPACK_UNPACK_SUCCESS ) {
165+ root = result .data ;
166+ map = NULL ;
167+
168+ if (root .type == MSGPACK_OBJECT_ARRAY && root .via .array .size == 2 ) {
169+ if (root .via .array .ptr [1 ].type == MSGPACK_OBJECT_MAP ) {
170+ map = & root .via .array .ptr [1 ];
171+ }
172+ }
173+ else if (root .type == MSGPACK_OBJECT_MAP ) {
174+ map = & root ;
175+ }
176+
177+ pthread_mutex_lock (& result_mutex );
178+ verifier -> records ++ ;
179+ pthread_mutex_unlock (& result_mutex );
180+
181+ if (map == NULL ) {
182+ continue ;
183+ }
184+
185+ for (i = 0 ; i < map -> via .map .size ; i ++ ) {
186+ kv = & map -> via .map .ptr [i ];
187+
188+ if (kv -> key .type != MSGPACK_OBJECT_STR ) {
189+ continue ;
190+ }
191+
192+ if (kv -> key .via .str .size != 3 ||
193+ strncmp (kv -> key .via .str .ptr , "log" , 3 ) != 0 ) {
194+ continue ;
195+ }
196+
197+ if (kv -> val .type != MSGPACK_OBJECT_STR ) {
198+ continue ;
199+ }
200+
201+ if (kv -> val .via .str .size == verifier -> expected_len &&
202+ memcmp (kv -> val .via .str .ptr ,
203+ verifier -> expected ,
204+ verifier -> expected_len ) == 0 ) {
205+ pthread_mutex_lock (& result_mutex );
206+ verifier -> valid_matches ++ ;
207+ pthread_mutex_unlock (& result_mutex );
208+ }
209+ }
210+ }
211+
212+ msgpack_unpacked_destroy (& result );
213+ flb_free (record );
214+
215+ return 0 ;
216+ }
217+
93218/* Callback to check expected results */
94219static int cb_check_result_json (void * record , size_t size , void * data )
95220{
@@ -613,12 +738,110 @@ void flb_test_issue_5336()
613738 test_ctx_destroy (ctx );
614739}
615740
741+
742+ void flb_test_format_none_large_record ()
743+ {
744+ struct flb_lib_out_cb cb_data ;
745+ struct test_ctx * ctx ;
746+ flb_sockfd_t fd ;
747+ int ret ;
748+ int records ;
749+ int valid_matches ;
750+ ssize_t w_size ;
751+ struct test_log_verifier verifier ;
752+ size_t payload_size = 131072 ;
753+ char * buf ;
754+
755+ clear_output_num ();
756+
757+ buf = flb_malloc (payload_size + 3 );
758+ if (!TEST_CHECK (buf != NULL )) {
759+ TEST_MSG ("failed to allocate test payload" );
760+ exit (EXIT_FAILURE );
761+ }
762+
763+ memset (buf , 'a' , payload_size );
764+ buf [payload_size ] = ':' ;
765+ buf [payload_size + 1 ] = ';' ;
766+ buf [payload_size + 2 ] = '\0' ;
767+
768+ reset_log_verifier (& verifier , buf , payload_size + 1 );
769+
770+ cb_data .cb = cb_check_large_record_msgpack ;
771+ cb_data .data = & verifier ;
772+
773+ ctx = test_ctx_create (& cb_data );
774+ if (!TEST_CHECK (ctx != NULL )) {
775+ TEST_MSG ("test_ctx_create failed" );
776+ flb_free (buf );
777+ exit (EXIT_FAILURE );
778+ }
779+
780+ ret = flb_output_set (ctx -> flb , ctx -> o_ffd ,
781+ "match" , "*" ,
782+ NULL );
783+ TEST_CHECK (ret == 0 );
784+
785+ ret = flb_input_set (ctx -> flb , ctx -> i_ffd ,
786+ "format" , "none" ,
787+ "separator" , ";" ,
788+ "chunk_size" , "64KB" ,
789+ "buffer_size" , "256KB" ,
790+ NULL );
791+ TEST_CHECK (ret == 0 );
792+
793+ ret = flb_start (ctx -> flb );
794+ TEST_CHECK (ret == 0 );
795+
796+ fd = connect_tcp (NULL , -1 );
797+ if (!TEST_CHECK (fd >= 0 )) {
798+ flb_free (buf );
799+ exit (EXIT_FAILURE );
800+ }
801+
802+ w_size = 0 ;
803+ while ((size_t ) w_size < payload_size + 2 ) {
804+ ret = send (fd ,
805+ buf + w_size ,
806+ (payload_size + 2 ) - (size_t ) w_size ,
807+ 0 );
808+
809+ if (!TEST_CHECK (ret > 0 )) {
810+ TEST_MSG ("failed to send large payload, errno=%d" , errno );
811+ flb_socket_close (fd );
812+ flb_free (buf );
813+ exit (EXIT_FAILURE );
814+ }
815+
816+ w_size += ret ;
817+ }
818+
819+ TEST_CHECK (w_size == (ssize_t ) (payload_size + 2 ));
820+
821+ flb_time_msleep (1500 );
822+
823+ get_log_verifier (& verifier , & records , & valid_matches );
824+
825+ if (!TEST_CHECK (records == 1 )) {
826+ TEST_MSG ("got %d outputs, expected 1" , records );
827+ }
828+
829+ if (!TEST_CHECK (valid_matches == 1 )) {
830+ TEST_MSG ("matched payload count=%d, expected 1" , valid_matches );
831+ }
832+
833+ flb_socket_close (fd );
834+ flb_free (buf );
835+ test_ctx_destroy (ctx );
836+ }
837+
616838TEST_LIST = {
617839 {"tcp" , flb_test_tcp },
618840 {"tcp_with_source_address" , flb_test_tcp_with_source_address },
619841 {"tcp_with_tls" , flb_test_tcp_with_tls },
620842 {"format_none" , flb_test_format_none },
621843 {"format_none_separator" , flb_test_format_none_separator },
844+ {"format_none_large_record" , flb_test_format_none_large_record },
622845 {"65535_records_issue_5336" , flb_test_issue_5336 },
623846 {NULL , NULL }
624847};
0 commit comments