/* Test deflate() on concurrently modified next_in. * * Plain zlib does not document that this is supported, but in practice it tolerates this, and QEMU live migration is * known to rely on this. Make sure zlib-ng tolerates this as well. */ #include "zbuild.h" #ifdef ZLIB_COMPAT #include "zlib.h" #else #include "zlib-ng.h" #endif #include #include #include #include #include static uint8_t buf[8 * 1024]; static uint8_t zbuf[4 * 1024]; static uint8_t tmp[8 * 1024]; /* Thread that increments all bytes in buf by 1. */ class Mutator { enum class State { PAUSED, RUNNING, STOPPED, }; public: Mutator() : m_state(State::PAUSED), m_target_state(State::PAUSED), m_thread(&Mutator::run, this) {} ~Mutator() { transition(State::STOPPED); m_thread.join(); } void pause() { transition(State::PAUSED); } void resume() { transition(State::RUNNING); } private: void run() { while (true) { m_state.store(m_target_state); if (m_state == State::PAUSED) continue; if (m_state == State::STOPPED) break; for (uint8_t & i: buf) i++; } } void transition(State target_state) { m_target_state = target_state; while (m_state != target_state) { } } std::atomic m_state, m_target_state; std::thread m_thread; }; TEST(deflate, concurrency) { #ifdef S390_DFLTCC_DEFLATE GTEST_SKIP() << "Known to be broken with S390_DFLTCC_DEFLATE"; #endif /* Create reusable mutator and streams. */ Mutator mutator; PREFIX3(stream) dstrm; memset(&dstrm, 0, sizeof(dstrm)); int err = PREFIX(deflateInit2)(&dstrm, Z_BEST_SPEED, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY); ASSERT_EQ(Z_OK, err) << dstrm.msg; PREFIX3(stream) istrm; memset(&istrm, 0, sizeof(istrm)); err = PREFIX(inflateInit2)(&istrm, -15); ASSERT_EQ(Z_OK, err) << istrm.msg; /* Iterate for a certain amount of time. */ auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(1); while (std::chrono::steady_clock::now() < deadline) { /* Start each iteration with a fresh stream state. */ err = PREFIX(deflateReset)(&dstrm); ASSERT_EQ(Z_OK, err) << dstrm.msg; err = PREFIX(inflateReset)(&istrm); ASSERT_EQ(Z_OK, err) << istrm.msg; /* Mutate and compress the first half of buf concurrently. * Decompress and throw away the results, which are unpredictable. */ mutator.resume(); dstrm.next_in = buf; dstrm.avail_in = sizeof(buf) / 2; while (dstrm.avail_in > 0) { dstrm.next_out = zbuf; dstrm.avail_out = sizeof(zbuf); err = PREFIX(deflate)(&dstrm, Z_NO_FLUSH); ASSERT_EQ(Z_OK, err) << dstrm.msg; istrm.next_in = zbuf; istrm.avail_in = sizeof(zbuf) - dstrm.avail_out; while (istrm.avail_in > 0) { istrm.next_out = tmp; istrm.avail_out = sizeof(tmp); err = PREFIX(inflate)(&istrm, Z_NO_FLUSH); ASSERT_EQ(Z_OK, err) << istrm.msg; } } /* Stop mutation and compress the second half of buf. * Decompress and check that the result matches. */ mutator.pause(); dstrm.next_in = buf + sizeof(buf) / 2; dstrm.avail_in = sizeof(buf) - sizeof(buf) / 2; while (dstrm.avail_in > 0) { dstrm.next_out = zbuf; dstrm.avail_out = sizeof(zbuf); err = PREFIX(deflate)(&dstrm, Z_FINISH); if (err == Z_STREAM_END) ASSERT_EQ(0u, dstrm.avail_in); else ASSERT_EQ(Z_OK, err) << dstrm.msg; istrm.next_in = zbuf; istrm.avail_in = sizeof(zbuf) - dstrm.avail_out; while (istrm.avail_in > 0) { size_t orig_total_out = istrm.total_out; istrm.next_out = tmp; istrm.avail_out = sizeof(tmp); err = PREFIX(inflate)(&istrm, Z_NO_FLUSH); if (err == Z_STREAM_END) ASSERT_EQ(0u, istrm.avail_in); else ASSERT_EQ(Z_OK, err) << istrm.msg; size_t concurrent_size = sizeof(buf) - sizeof(buf) / 2; if (istrm.total_out > concurrent_size) { size_t tmp_offset, buf_offset, size; if (orig_total_out >= concurrent_size) { tmp_offset = 0; buf_offset = orig_total_out - concurrent_size; size = istrm.total_out - orig_total_out; } else { tmp_offset = concurrent_size - orig_total_out; buf_offset = 0; size = istrm.total_out - concurrent_size; } ASSERT_EQ(0, memcmp(tmp + tmp_offset, buf + sizeof(buf) / 2 + buf_offset, size)); } } } } err = PREFIX(inflateEnd)(&istrm); ASSERT_EQ(Z_OK, err) << istrm.msg; err = PREFIX(deflateEnd)(&dstrm); ASSERT_EQ(Z_OK, err) << istrm.msg; }